You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/01/17 06:34:18 UTC

hbase git commit: HBASE-17372 Make AsyncTable thread safe

Repository: hbase
Updated Branches:
  refs/heads/master 4cb09a494 -> 4ab95ebbc


HBASE-17372 Make AsyncTable thread safe


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4ab95ebb
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4ab95ebb
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4ab95ebb

Branch: refs/heads/master
Commit: 4ab95ebbceb144d90e03bce45afa52bcb8c62c54
Parents: 4cb09a4
Author: zhangduo <zh...@apache.org>
Authored: Tue Jan 17 09:55:23 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Tue Jan 17 14:33:28 2017 +0800

----------------------------------------------------------------------
 .../client/AsyncBatchRpcRetryingCaller.java     |  54 ++-------
 .../hadoop/hbase/client/AsyncClientScanner.java |  21 +++-
 .../hadoop/hbase/client/AsyncConnection.java    |  39 ++++--
 .../client/AsyncConnectionConfiguration.java    |  18 ++-
 .../hbase/client/AsyncConnectionImpl.java       |  27 +++--
 .../hadoop/hbase/client/AsyncRegionLocator.java |   2 +-
 .../client/AsyncRpcRetryingCallerFactory.java   | 102 ++++++++++++----
 .../AsyncScanSingleRegionRpcRetryingCaller.java |   4 +-
 .../AsyncSingleRequestRpcRetryingCaller.java    |   5 +-
 .../client/AsyncSmallScanRpcRetryingCaller.java |  15 ++-
 .../apache/hadoop/hbase/client/AsyncTable.java  |   6 +-
 .../hadoop/hbase/client/AsyncTableBase.java     | 118 ++++++-------------
 .../hadoop/hbase/client/AsyncTableBuilder.java  | 113 ++++++++++++++++++
 .../hbase/client/AsyncTableBuilderBase.java     | 111 +++++++++++++++++
 .../hadoop/hbase/client/AsyncTableImpl.java     |  42 +++----
 .../hadoop/hbase/client/ConnectionUtils.java    |  15 +--
 .../hadoop/hbase/client/RawAsyncTable.java      |   2 +
 .../hadoop/hbase/client/RawAsyncTableImpl.java  | 113 ++++++++++--------
 .../org/apache/hadoop/hbase/HConstants.java     |   3 -
 ...TestAsyncSingleRequestRpcRetryingCaller.java |  52 +++-----
 .../client/TestAsyncTableGetMultiThreaded.java  |  28 ++---
 .../hbase/client/TestRawAsyncTableScan.java     |   6 +-
 22 files changed, 583 insertions(+), 313 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
index 6f0b8e9..9b362d1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -21,7 +21,6 @@ import static org.apache.hadoop.hbase.CellUtil.createCellScanner;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
 import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
 
@@ -40,7 +39,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -61,7 +59,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
-import org.apache.hadoop.hbase.util.AtomicUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
@@ -102,9 +99,7 @@ class AsyncBatchRpcRetryingCaller<T> {
 
   private final long operationTimeoutNs;
 
-  private final long readRpcTimeoutNs;
-
-  private final long writeRpcTimeoutNs;
+  private final long rpcTimeoutNs;
 
   private final int startLogErrorsCnt;
 
@@ -128,39 +123,22 @@ class AsyncBatchRpcRetryingCaller<T> {
     public final ConcurrentMap<byte[], RegionRequest> actionsByRegion =
         new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
 
-    public final AtomicLong rpcTimeoutNs;
-
-    public ServerRequest(long defaultRpcTimeoutNs) {
-      this.rpcTimeoutNs = new AtomicLong(defaultRpcTimeoutNs);
-    }
-
-    public void addAction(HRegionLocation loc, Action action, long rpcTimeoutNs) {
+    public void addAction(HRegionLocation loc, Action action) {
       computeIfAbsent(actionsByRegion, loc.getRegionInfo().getRegionName(),
         () -> new RegionRequest(loc)).actions.add(action);
-      // try update the timeout to a larger value
-      if (this.rpcTimeoutNs.get() <= 0) {
-        return;
-      }
-      if (rpcTimeoutNs <= 0) {
-        this.rpcTimeoutNs.set(-1L);
-        return;
-      }
-      AtomicUtils.updateMax(this.rpcTimeoutNs, rpcTimeoutNs);
     }
   }
 
   public AsyncBatchRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
-      TableName tableName, List<? extends Row> actions, long pauseNs, int maxRetries,
-      long operationTimeoutNs, long readRpcTimeoutNs, long writeRpcTimeoutNs,
-      int startLogErrorsCnt) {
+      TableName tableName, List<? extends Row> actions, long pauseNs, int maxAttempts,
+      long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
     this.retryTimer = retryTimer;
     this.conn = conn;
     this.tableName = tableName;
     this.pauseNs = pauseNs;
-    this.maxAttempts = retries2Attempts(maxRetries);
+    this.maxAttempts = maxAttempts;
     this.operationTimeoutNs = operationTimeoutNs;
-    this.readRpcTimeoutNs = readRpcTimeoutNs;
-    this.writeRpcTimeoutNs = writeRpcTimeoutNs;
+    this.rpcTimeoutNs = rpcTimeoutNs;
     this.startLogErrorsCnt = startLogErrorsCnt;
 
     this.actions = new ArrayList<>(actions.size());
@@ -366,7 +344,7 @@ class AsyncBatchRpcRetryingCaller<T> {
         return;
       }
       HBaseRpcController controller = conn.rpcControllerFactory.newController();
-      resetController(controller, Math.min(serverReq.rpcTimeoutNs.get(), remainingNs));
+      resetController(controller, Math.min(rpcTimeoutNs, remainingNs));
       if (!cells.isEmpty()) {
         controller.setCellScanner(createCellScanner(cells));
       }
@@ -416,10 +394,6 @@ class AsyncBatchRpcRetryingCaller<T> {
     retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS);
   }
 
-  private long getRpcTimeoutNs(Action action) {
-    return action.getAction() instanceof Get ? readRpcTimeoutNs : writeRpcTimeoutNs;
-  }
-
   private void groupAndSend(Stream<Action> actions, int tries) {
     long locateTimeoutNs;
     if (operationTimeoutNs > 0) {
@@ -433,15 +407,6 @@ class AsyncBatchRpcRetryingCaller<T> {
     }
     ConcurrentMap<ServerName, ServerRequest> actionsByServer = new ConcurrentHashMap<>();
     ConcurrentLinkedQueue<Action> locateFailed = new ConcurrentLinkedQueue<>();
-    // use the small one as the default timeout value, and increase the timeout value if we have an
-    // action in the group needs a larger timeout value.
-    long defaultRpcTimeoutNs;
-    if (readRpcTimeoutNs > 0) {
-      defaultRpcTimeoutNs =
-          writeRpcTimeoutNs > 0 ? Math.min(readRpcTimeoutNs, writeRpcTimeoutNs) : readRpcTimeoutNs;
-    } else {
-      defaultRpcTimeoutNs = writeRpcTimeoutNs > 0 ? writeRpcTimeoutNs : -1L;
-    }
     CompletableFuture.allOf(actions
         .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(),
           RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> {
@@ -454,9 +419,8 @@ class AsyncBatchRpcRetryingCaller<T> {
               addError(action, error, null);
               locateFailed.add(action);
             } else {
-              computeIfAbsent(actionsByServer, loc.getServerName(),
-                () -> new ServerRequest(defaultRpcTimeoutNs)).addAction(loc, action,
-                  getRpcTimeoutNs(action));
+              computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new)
+                  .addAction(loc, action);
             }
           }))
         .toArray(CompletableFuture[]::new)).whenComplete((v, r) -> {

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
index d7a3ed1..f656a6c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
@@ -55,14 +55,21 @@ class AsyncClientScanner {
 
   private final AsyncConnectionImpl conn;
 
+  private final long pauseNs;
+
+  private final int maxAttempts;
+
   private final long scanTimeoutNs;
 
   private final long rpcTimeoutNs;
 
+  private final int startLogErrorsCnt;
+
   private final ScanResultCache resultCache;
 
   public AsyncClientScanner(Scan scan, RawScanResultConsumer consumer, TableName tableName,
-      AsyncConnectionImpl conn, long scanTimeoutNs, long rpcTimeoutNs) {
+      AsyncConnectionImpl conn, long pauseNs, int maxAttempts, long scanTimeoutNs,
+      long rpcTimeoutNs, int startLogErrorsCnt) {
     if (scan.getStartRow() == null) {
       scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow());
     }
@@ -73,8 +80,11 @@ class AsyncClientScanner {
     this.consumer = consumer;
     this.tableName = tableName;
     this.conn = conn;
+    this.pauseNs = pauseNs;
+    this.maxAttempts = maxAttempts;
     this.scanTimeoutNs = scanTimeoutNs;
     this.rpcTimeoutNs = rpcTimeoutNs;
+    this.startLogErrorsCnt = startLogErrorsCnt;
     this.resultCache = scan.getAllowPartialResults() || scan.getBatch() > 0
         ? new AllowPartialScanResultCache() : new CompleteScanResultCache();
   }
@@ -117,7 +127,9 @@ class AsyncClientScanner {
     conn.callerFactory.scanSingleRegion().id(resp.scannerId).location(resp.loc).stub(resp.stub)
         .setScan(scan).consumer(consumer).resultCache(resultCache)
         .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
-        .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).start().whenComplete((hasMore, error) -> {
+        .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
+        .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).start()
+        .whenComplete((hasMore, error) -> {
           if (error != null) {
             consumer.onError(error);
             return;
@@ -133,8 +145,9 @@ class AsyncClientScanner {
   private void openScanner() {
     conn.callerFactory.<OpenScannerResponse> single().table(tableName).row(scan.getStartRow())
         .locateType(getLocateType(scan)).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
-        .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).action(this::callOpenScanner).call()
-        .whenComplete((resp, error) -> {
+        .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
+        .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner)
+        .call().whenComplete((resp, error) -> {
           if (error != null) {
             consumer.onError(error);
             return;

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
index 7b0f339..9f114ac 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
@@ -50,21 +50,32 @@ public interface AsyncConnection extends Closeable {
   AsyncTableRegionLocator getRegionLocator(TableName tableName);
 
   /**
-   * Retrieve an RawAsyncTable implementation for accessing a table. The returned Table is not
-   * thread safe, a new instance should be created for each using thread. This is a lightweight
-   * operation, pooling or caching of the returned AsyncTable is neither required nor desired.
+   * Retrieve an {@link RawAsyncTable} implementation for accessing a table.
+   * <p>
+   * The returned instance will use default configs. Use {@link #getRawTableBuilder(TableName)} if you
+   * want to customize some configs.
    * <p>
    * This method no longer checks table existence. An exception will be thrown if the table does not
    * exist only when the first operation is attempted.
    * @param tableName the name of the table
    * @return an RawAsyncTable to use for interactions with this table
+   * @see #getRawTableBuilder(TableName)
+   */
+  default RawAsyncTable getRawTable(TableName tableName) {
+    return getRawTableBuilder(tableName).build();
+  }
+
+  /**
+   * Returns an {@link AsyncTableBuilder} for creating {@link RawAsyncTable}.
+   * <p>
+   * This method no longer checks table existence. An exception will be thrown if the table does not
+   * exist only when the first operation is attempted.
+   * @param tableName the name of the table
    */
-  RawAsyncTable getRawTable(TableName tableName);
+  AsyncTableBuilder<RawAsyncTable> getRawTableBuilder(TableName tableName);
 
   /**
-   * Retrieve an AsyncTable implementation for accessing a table. The returned Table is not thread
-   * safe, a new instance should be created for each using thread. This is a lightweight operation,
-   * pooling or caching of the returned AsyncTable is neither required nor desired.
+   * Retrieve an AsyncTable implementation for accessing a table.
    * <p>
    * This method no longer checks table existence. An exception will be thrown if the table does not
    * exist only when the first operation is attempted.
@@ -72,5 +83,17 @@ public interface AsyncConnection extends Closeable {
    * @param pool the thread pool to use for executing callback
    * @return an AsyncTable to use for interactions with this table
    */
-  AsyncTable getTable(TableName tableName, ExecutorService pool);
+  default AsyncTable getTable(TableName tableName, ExecutorService pool) {
+    return getTableBuilder(tableName, pool).build();
+  }
+
+  /**
+   * Returns an {@link AsyncTableBuilder} for creating {@link AsyncTable}.
+   * <p>
+   * This method no longer checks table existence. An exception will be thrown if the table does not
+   * exist only when the first operation is attempted.
+   * @param tableName the name of the table
+   * @param pool the thread pool to use for executing callback
+   */
+  AsyncTableBuilder<AsyncTable> getTableBuilder(TableName tableName, ExecutorService pool);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
index 6279d46..585a104 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
@@ -56,6 +56,10 @@ class AsyncConnectionConfiguration {
   // by this value, see scanTimeoutNs.
   private final long operationTimeoutNs;
 
+  // timeout for each rpc request. Can be overridden by a more specific config, such as
+  // readRpcTimeout or writeRpcTimeout.
+  private final long rpcTimeoutNs;
+
   // timeout for each read rpc request
   private final long readRpcTimeoutNs;
 
@@ -85,10 +89,12 @@ class AsyncConnectionConfiguration {
       conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
     this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
       conf.getLong(HBASE_CLIENT_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
-    this.readRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY,
-      conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT)));
-    this.writeRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY,
-      conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT)));
+    this.rpcTimeoutNs = TimeUnit.MILLISECONDS
+        .toNanos(conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT));
+    this.readRpcTimeoutNs =
+        TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutNs));
+    this.writeRpcTimeoutNs =
+        TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutNs));
     this.pauseNs =
         TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE));
     this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
@@ -111,6 +117,10 @@ class AsyncConnectionConfiguration {
     return operationTimeoutNs;
   }
 
+  long getRpcTimeoutNs() {
+    return rpcTimeoutNs;
+  }
+
   long getReadRpcTimeoutNs() {
     return readRpcTimeoutNs;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index d660b02..c58500a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -18,8 +18,6 @@
 package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.HConstants.CLUSTER_ID_DEFAULT;
-import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
-import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
 import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
@@ -90,7 +88,6 @@ class AsyncConnectionImpl implements AsyncConnection {
 
   private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
 
-  @SuppressWarnings("deprecation")
   public AsyncConnectionImpl(Configuration conf, User user) {
     this.conf = conf;
     this.user = user;
@@ -105,7 +102,8 @@ class AsyncConnectionImpl implements AsyncConnection {
     this.rpcClient = RpcClientFactory.createClient(conf, clusterId);
     this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
     this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
-    this.rpcTimeout = conf.getInt(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT);
+    this.rpcTimeout = (int) Math.min(Integer.MAX_VALUE,
+      TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs()));
     this.locator = new AsyncRegionLocator(this, RETRY_TIMER);
     this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER);
     if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
@@ -152,12 +150,25 @@ class AsyncConnectionImpl implements AsyncConnection {
   }
 
   @Override
-  public RawAsyncTable getRawTable(TableName tableName) {
-    return new RawAsyncTableImpl(this, tableName);
+  public AsyncTableBuilder<RawAsyncTable> getRawTableBuilder(TableName tableName) {
+    return new AsyncTableBuilderBase<RawAsyncTable>(tableName, connConf) {
+
+      @Override
+      public RawAsyncTable build() {
+        return new RawAsyncTableImpl(AsyncConnectionImpl.this, this);
+      }
+    };
   }
 
   @Override
-  public AsyncTable getTable(TableName tableName, ExecutorService pool) {
-    return new AsyncTableImpl(this, tableName, pool);
+  public AsyncTableBuilder<AsyncTable> getTableBuilder(TableName tableName, ExecutorService pool) {
+    return new AsyncTableBuilderBase<AsyncTable>(tableName, connConf) {
+
+      @Override
+      public AsyncTable build() {
+        RawAsyncTableImpl rawTable = new RawAsyncTableImpl(AsyncConnectionImpl.this, this);
+        return new AsyncTableImpl(AsyncConnectionImpl.this, rawTable, pool);
+      }
+    };
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
index 7a45ae3..7030eac 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
@@ -71,7 +71,7 @@ class AsyncRegionLocator {
       future.completeExceptionally(new TimeoutIOException(timeoutMsg.get()));
     }, timeoutNs, TimeUnit.NANOSECONDS);
     return future.whenComplete((loc, error) -> {
-      if (error.getClass() != TimeoutIOException.class) {
+      if (error != null && error.getClass() != TimeoutIOException.class) {
         // cancel timeout task if we are not completed by it.
         timeoutTask.cancel();
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index 55c56ab..76b6a33 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
 
 import io.netty.util.HashedWheelTimer;
 
@@ -46,7 +47,16 @@ class AsyncRpcRetryingCallerFactory {
     this.retryTimer = retryTimer;
   }
 
-  public class SingleRequestCallerBuilder<T> {
+  private abstract class BuilderBase {
+
+    protected long pauseNs = conn.connConf.getPauseNs();
+
+    protected int maxAttempts = retries2Attempts(conn.connConf.getMaxRetries());
+
+    protected int startLogErrorsCnt = conn.connConf.getStartLogErrorsCnt();
+  }
+
+  public class SingleRequestCallerBuilder<T> extends BuilderBase {
 
     private TableName tableName;
 
@@ -91,12 +101,26 @@ class AsyncRpcRetryingCallerFactory {
       return this;
     }
 
+    public SingleRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
+      this.pauseNs = unit.toNanos(pause);
+      return this;
+    }
+
+    public SingleRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
+      this.maxAttempts = maxAttempts;
+      return this;
+    }
+
+    public SingleRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
+      this.startLogErrorsCnt = startLogErrorsCnt;
+      return this;
+    }
+
     public AsyncSingleRequestRpcRetryingCaller<T> build() {
       return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn,
           checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"),
           checkNotNull(locateType, "locateType is null"), checkNotNull(callable, "action is null"),
-          conn.connConf.getPauseNs(), conn.connConf.getMaxRetries(), operationTimeoutNs,
-          rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt());
+          pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
     }
 
     /**
@@ -114,7 +138,7 @@ class AsyncRpcRetryingCallerFactory {
     return new SingleRequestCallerBuilder<>();
   }
 
-  public class SmallScanCallerBuilder {
+  public class SmallScanCallerBuilder extends BuilderBase {
 
     private TableName tableName;
 
@@ -151,12 +175,27 @@ class AsyncRpcRetryingCallerFactory {
       return this;
     }
 
+    public SmallScanCallerBuilder pause(long pause, TimeUnit unit) {
+      this.pauseNs = unit.toNanos(pause);
+      return this;
+    }
+
+    public SmallScanCallerBuilder maxAttempts(int maxAttempts) {
+      this.maxAttempts = maxAttempts;
+      return this;
+    }
+
+    public SmallScanCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) {
+      this.startLogErrorsCnt = startLogErrorsCnt;
+      return this;
+    }
+
     public AsyncSmallScanRpcRetryingCaller build() {
       TableName tableName = checkNotNull(this.tableName, "tableName is null");
       Scan scan = checkNotNull(this.scan, "scan is null");
       checkArgument(limit > 0, "invalid limit %d", limit);
-      return new AsyncSmallScanRpcRetryingCaller(conn, tableName, scan, limit, scanTimeoutNs,
-          rpcTimeoutNs);
+      return new AsyncSmallScanRpcRetryingCaller(conn, tableName, scan, limit, pauseNs, maxAttempts,
+          scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
     }
 
     /**
@@ -174,7 +213,7 @@ class AsyncRpcRetryingCallerFactory {
     return new SmallScanCallerBuilder();
   }
 
-  public class ScanSingleRegionCallerBuilder {
+  public class ScanSingleRegionCallerBuilder extends BuilderBase {
 
     private long scannerId = -1L;
 
@@ -232,15 +271,29 @@ class AsyncRpcRetryingCallerFactory {
       return this;
     }
 
+    public ScanSingleRegionCallerBuilder pause(long pause, TimeUnit unit) {
+      this.pauseNs = unit.toNanos(pause);
+      return this;
+    }
+
+    public ScanSingleRegionCallerBuilder maxAttempts(int maxAttempts) {
+      this.maxAttempts = maxAttempts;
+      return this;
+    }
+
+    public ScanSingleRegionCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) {
+      this.startLogErrorsCnt = startLogErrorsCnt;
+      return this;
+    }
+
     public AsyncScanSingleRegionRpcRetryingCaller build() {
       checkArgument(scannerId >= 0, "invalid scannerId %d", scannerId);
       return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn,
           checkNotNull(scan, "scan is null"), scannerId,
           checkNotNull(resultCache, "resultCache is null"),
           checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"),
-          checkNotNull(loc, "location is null"), conn.connConf.getPauseNs(),
-          conn.connConf.getMaxRetries(), scanTimeoutNs, rpcTimeoutNs,
-          conn.connConf.getStartLogErrorsCnt());
+          checkNotNull(loc, "location is null"), pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs,
+          startLogErrorsCnt);
     }
 
     /**
@@ -258,7 +311,7 @@ class AsyncRpcRetryingCallerFactory {
     return new ScanSingleRegionCallerBuilder();
   }
 
-  public class BatchCallerBuilder {
+  public class BatchCallerBuilder extends BuilderBase {
 
     private TableName tableName;
 
@@ -266,9 +319,7 @@ class AsyncRpcRetryingCallerFactory {
 
     private long operationTimeoutNs = -1L;
 
-    private long readRpcTimeoutNs = -1L;
-
-    private long writeRpcTimeoutNs = -1L;
+    private long rpcTimeoutNs = -1L;
 
     public BatchCallerBuilder table(TableName tableName) {
       this.tableName = tableName;
@@ -285,20 +336,29 @@ class AsyncRpcRetryingCallerFactory {
       return this;
     }
 
-    public BatchCallerBuilder readRpcTimeout(long rpcTimeout, TimeUnit unit) {
-      this.readRpcTimeoutNs = unit.toNanos(rpcTimeout);
+    public BatchCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) {
+      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
+      return this;
+    }
+
+    public BatchCallerBuilder pause(long pause, TimeUnit unit) {
+      this.pauseNs = unit.toNanos(pause);
+      return this;
+    }
+
+    public BatchCallerBuilder maxAttempts(int maxAttempts) {
+      this.maxAttempts = maxAttempts;
       return this;
     }
 
-    public BatchCallerBuilder writeRpcTimeout(long rpcTimeout, TimeUnit unit) {
-      this.writeRpcTimeoutNs = unit.toNanos(rpcTimeout);
+    public BatchCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) {
+      this.startLogErrorsCnt = startLogErrorsCnt;
       return this;
     }
 
     public <T> AsyncBatchRpcRetryingCaller<T> build() {
-      return new AsyncBatchRpcRetryingCaller<T>(retryTimer, conn, tableName, actions,
-          conn.connConf.getPauseNs(), conn.connConf.getMaxRetries(), operationTimeoutNs,
-          readRpcTimeoutNs, writeRpcTimeoutNs, conn.connConf.getStartLogErrorsCnt());
+      return new AsyncBatchRpcRetryingCaller<T>(retryTimer, conn, tableName, actions, pauseNs,
+          maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
     }
 
     public <T> List<CompletableFuture<T>> call() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index dae88a7..5d3b736 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -108,7 +108,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
   public AsyncScanSingleRegionRpcRetryingCaller(HashedWheelTimer retryTimer,
       AsyncConnectionImpl conn, Scan scan, long scannerId, ScanResultCache resultCache,
       RawScanResultConsumer consumer, Interface stub, HRegionLocation loc, long pauseNs,
-      int maxRetries, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
+      int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
     this.retryTimer = retryTimer;
     this.scan = scan;
     this.scannerId = scannerId;
@@ -117,7 +117,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     this.stub = stub;
     this.loc = loc;
     this.pauseNs = pauseNs;
-    this.maxAttempts = retries2Attempts(maxRetries);
+    this.maxAttempts = maxAttempts;
     this.scanTimeoutNs = scanTimeoutNs;
     this.rpcTimeoutNs = rpcTimeoutNs;
     this.startLogErrorsCnt = startLogErrorsCnt;

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
index 04e69af..4ce6a18 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.client;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
 
 import io.netty.util.HashedWheelTimer;
@@ -90,7 +89,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
 
   public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
       TableName tableName, byte[] row, RegionLocateType locateType, Callable<T> callable,
-      long pauseNs, int maxRetries, long operationTimeoutNs, long rpcTimeoutNs,
+      long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
       int startLogErrorsCnt) {
     this.retryTimer = retryTimer;
     this.conn = conn;
@@ -99,7 +98,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
     this.locateType = locateType;
     this.callable = callable;
     this.pauseNs = pauseNs;
-    this.maxAttempts = retries2Attempts(maxRetries);
+    this.maxAttempts = maxAttempts;
     this.operationTimeoutNs = operationTimeoutNs;
     this.rpcTimeoutNs = rpcTimeoutNs;
     this.startLogErrorsCnt = startLogErrorsCnt;

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java
index 6ffa30a..98a276f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java
@@ -57,6 +57,12 @@ class AsyncSmallScanRpcRetryingCaller {
 
   private final long rpcTimeoutNs;
 
+  private final long pauseNs;
+
+  private final int maxAttempts;
+
+  private final int startLogErrosCnt;
+
   private final Function<HRegionInfo, Boolean> nextScan;
 
   private final List<Result> resultList;
@@ -64,13 +70,17 @@ class AsyncSmallScanRpcRetryingCaller {
   private final CompletableFuture<List<Result>> future;
 
   public AsyncSmallScanRpcRetryingCaller(AsyncConnectionImpl conn, TableName tableName, Scan scan,
-      int limit, long scanTimeoutNs, long rpcTimeoutNs) {
+      int limit, long pauseNs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs,
+      int startLogErrosCnt) {
     this.conn = conn;
     this.tableName = tableName;
     this.scan = scan;
     this.limit = limit;
     this.scanTimeoutNs = scanTimeoutNs;
     this.rpcTimeoutNs = rpcTimeoutNs;
+    this.pauseNs = pauseNs;
+    this.maxAttempts = maxAttempts;
+    this.startLogErrosCnt = startLogErrosCnt;
     if (scan.isReversed()) {
       this.nextScan = this::reversedNextScan;
     } else {
@@ -146,7 +156,8 @@ class AsyncSmallScanRpcRetryingCaller {
   private void scan() {
     conn.callerFactory.<SmallScanResponse> single().table(tableName).row(scan.getStartRow())
         .locateType(getLocateType(scan)).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
-        .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).action(this::scan).call()
+        .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
+        .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrosCnt).action(this::scan).call()
         .whenComplete((resp, error) -> {
           if (error != null) {
             future.completeExceptionally(error);

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
index 893beb9..402ad64 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
@@ -23,9 +23,11 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 /**
  * The asynchronous table for normal users.
  * <p>
+ * The implementation is required to be thread safe.
+ * <p>
  * The implementation should make sure that user can do everything they want to the returned
- * {@code CompletableFuture} without break anything. Usually the implementation will require user to
- * provide a {@code ExecutorService}.
+ * {@code CompletableFuture} without breaking anything. Usually the implementation will require user
+ * to provide a {@code ExecutorService}.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Unstable

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
index d80627f..d82fa22 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
@@ -18,9 +18,8 @@
 package org.apache.hadoop.hbase.client;
 
 import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.allOf;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.voidBatch;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.voidBatchAll;
 
 import com.google.common.base.Preconditions;
 
@@ -39,10 +38,9 @@ import org.apache.hadoop.hbase.util.Bytes;
  * The base interface for asynchronous version of Table. Obtain an instance from a
  * {@link AsyncConnection}.
  * <p>
- * The implementation is NOT required to be thread safe. Do NOT access it from multiple threads
- * concurrently.
+ * The implementation is required to be thread safe.
  * <p>
- * Usually the implementations will not throw any exception directly, you need to get the exception
+ * Usually the implementation will not throw any exception directly. You need to get the exception
  * from the returned {@link CompletableFuture}.
  */
 @InterfaceAudience.Public
@@ -62,12 +60,12 @@ public interface AsyncTableBase {
   Configuration getConfiguration();
 
   /**
-   * Set timeout of each rpc read request in operations of this Table instance, will override the
-   * value of {@code hbase.rpc.read.timeout} in configuration. If a rpc read request waiting too
-   * long, it will stop waiting and send a new request to retry until retries exhausted or operation
-   * timeout reached.
+   * Get timeout of each rpc request in this Table instance. It will be overridden by a more
+   * specific rpc timeout config such as readRpcTimeout or writeRpcTimeout.
+   * @see #getReadRpcTimeout(TimeUnit)
+   * @see #getWriteRpcTimeout(TimeUnit)
    */
-  void setReadRpcTimeout(long timeout, TimeUnit unit);
+  long getRpcTimeout(TimeUnit unit);
 
   /**
    * Get timeout of each rpc read request in this Table instance.
@@ -75,47 +73,18 @@ public interface AsyncTableBase {
   long getReadRpcTimeout(TimeUnit unit);
 
   /**
-   * Set timeout of each rpc write request in operations of this Table instance, will override the
-   * value of {@code hbase.rpc.write.timeout} in configuration. If a rpc write request waiting too
-   * long, it will stop waiting and send a new request to retry until retries exhausted or operation
-   * timeout reached.
-   */
-  void setWriteRpcTimeout(long timeout, TimeUnit unit);
-
-  /**
    * Get timeout of each rpc write request in this Table instance.
    */
   long getWriteRpcTimeout(TimeUnit unit);
 
   /**
-   * Set timeout of each operation in this Table instance, will override the value of
-   * {@code hbase.client.operation.timeout} in configuration.
-   * <p>
-   * Operation timeout is a top-level restriction that makes sure an operation will not be blocked
-   * more than this. In each operation, if rpc request fails because of timeout or other reason, it
-   * will retry until success or throw a RetriesExhaustedException. But if the total time elapsed
-   * reach the operation timeout before retries exhausted, it will break early and throw
-   * SocketTimeoutException.
-   */
-  void setOperationTimeout(long timeout, TimeUnit unit);
-
-  /**
    * Get timeout of each operation in Table instance.
    */
   long getOperationTimeout(TimeUnit unit);
 
   /**
-   * Set timeout of a single operation in a scan, such as openScanner and next. Will override the
-   * value {@code hbase.client.scanner.timeout.period} in configuration.
-   * <p>
-   * Generally a scan will never timeout after we add heartbeat support unless the region is
-   * crashed. The {@code scanTimeout} works like the {@code operationTimeout} for each single
-   * operation in a scan.
-   */
-  void setScanTimeout(long timeout, TimeUnit unit);
-
-  /**
-   * Get the timeout of a single operation in a scan.
+   * Get the timeout of a single operation in a scan. It works like operation timeout for other
+   * operations.
    */
   long getScanTimeout(TimeUnit unit);
 
@@ -353,29 +322,6 @@ public interface AsyncTableBase {
   CompletableFuture<List<Result>> smallScan(Scan scan, int limit);
 
   /**
-   * Extracts certain cells from the given rows, in batch.
-   * <p>
-   * Notice that you may not get all the results with this function, which means some of the
-   * returned {@link CompletableFuture}s may succeed while some of the other returned
-   * {@link CompletableFuture}s may fail.
-   * @param gets The objects that specify what data to fetch and from which rows.
-   * @return A list of {@link CompletableFuture}s that represent the result for each get.
-   */
-  default List<CompletableFuture<Result>> get(List<Get> gets) {
-    return batch(gets);
-  }
-
-  /**
-   * A simple version for batch get. It will fail if there are any failures and you will get the
-   * whole result list at once if the operation is succeeded.
-   * @param gets The objects that specify what data to fetch and from which rows.
-   * @return A {@link CompletableFuture} that wrapper the result list.
-   */
-  default CompletableFuture<List<Result>> getAll(List<Get> gets) {
-    return batchAll(gets);
-  }
-
-  /**
    * Test for the existence of columns in the table, as specified by the Gets.
    * <p>
    * This will return a list of booleans. Each value will be true if the related Get matches one or
@@ -386,8 +332,8 @@ public interface AsyncTableBase {
    * @return A list of {@link CompletableFuture}s that represent the existence for each get.
    */
   default List<CompletableFuture<Boolean>> exists(List<Get> gets) {
-    return get(toCheckExistenceOnly(gets)).stream().
-        <CompletableFuture<Boolean>>map(f -> f.thenApply(r -> r.getExists())).collect(toList());
+    return get(toCheckExistenceOnly(gets)).stream()
+        .<CompletableFuture<Boolean>> map(f -> f.thenApply(r -> r.getExists())).collect(toList());
   }
 
   /**
@@ -397,8 +343,28 @@ public interface AsyncTableBase {
    * @return A {@link CompletableFuture} that wrapper the result boolean list.
    */
   default CompletableFuture<List<Boolean>> existsAll(List<Get> gets) {
-    return getAll(toCheckExistenceOnly(gets))
-        .thenApply(l -> l.stream().map(r -> r.getExists()).collect(toList()));
+    return allOf(exists(gets));
+  }
+
+  /**
+   * Extracts certain cells from the given rows, in batch.
+   * <p>
+   * Notice that you may not get all the results with this function, which means some of the
+   * returned {@link CompletableFuture}s may succeed while some of the other returned
+   * {@link CompletableFuture}s may fail.
+   * @param gets The objects that specify what data to fetch and from which rows.
+   * @return A list of {@link CompletableFuture}s that represent the result for each get.
+   */
+  List<CompletableFuture<Result>> get(List<Get> gets);
+
+  /**
+   * A simple version for batch get. It will fail if there are any failures and you will get the
+   * whole result list at once if the operation is succeeded.
+   * @param gets The objects that specify what data to fetch and from which rows.
+   * @return A {@link CompletableFuture} that wrapper the result list.
+   */
+  default CompletableFuture<List<Result>> getAll(List<Get> gets) {
+    return allOf(get(gets));
   }
 
   /**
@@ -406,9 +372,7 @@ public interface AsyncTableBase {
    * @param puts The list of mutations to apply.
    * @return A list of {@link CompletableFuture}s that represent the result for each put.
    */
-  default List<CompletableFuture<Void>> put(List<Put> puts) {
-    return voidBatch(this, puts);
-  }
+  List<CompletableFuture<Void>> put(List<Put> puts);
 
   /**
    * A simple version of batch put. It will fail if there are any failures.
@@ -416,7 +380,7 @@ public interface AsyncTableBase {
    * @return A {@link CompletableFuture} that always returns null when complete normally.
    */
   default CompletableFuture<Void> putAll(List<Put> puts) {
-    return voidBatchAll(this, puts);
+    return allOf(put(puts)).thenApply(r -> null);
   }
 
   /**
@@ -424,9 +388,7 @@ public interface AsyncTableBase {
    * @param deletes list of things to delete.
    * @return A list of {@link CompletableFuture}s that represent the result for each delete.
    */
-  default List<CompletableFuture<Void>> delete(List<Delete> deletes) {
-    return voidBatch(this, deletes);
-  }
+  List<CompletableFuture<Void>> delete(List<Delete> deletes);
 
   /**
    * A simple version of batch delete. It will fail if there are any failures.
@@ -434,7 +396,7 @@ public interface AsyncTableBase {
    * @return A {@link CompletableFuture} that always returns null when complete normally.
    */
   default CompletableFuture<Void> deleteAll(List<Delete> deletes) {
-    return voidBatchAll(this, deletes);
+    return allOf(delete(deletes)).thenApply(r -> null);
   }
 
   /**
@@ -454,8 +416,6 @@ public interface AsyncTableBase {
    * @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}.
    */
   default <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) {
-    List<CompletableFuture<T>> futures = batch(actions);
-    return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
-        .thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList()));
+    return allOf(batch(actions));
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java
new file mode 100644
index 0000000..2330855
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * For creating {@link AsyncTable} or {@link RawAsyncTable}.
+ * <p>
+ * The implementation should have default configurations set before returning the builder to user.
+ * So users are free to only set the configs they care about to create a new
+ * AsyncTable/RawAsyncTable instance.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface AsyncTableBuilder<T extends AsyncTableBase> {
+
+  /**
+   * Set timeout for a whole operation such as get, put or delete. Notice that scan will not be
+   * effected by this value, see scanTimeoutNs.
+   * <p>
+   * Operation timeout and max attempt times(or max retry times) are both limitations for retrying,
+   * we will stop retrying when we reach any of the limitations.
+   * @see #setMaxAttempts(int)
+   * @see #setMaxRetries(int)
+   * @see #setScanTimeout(long, TimeUnit)
+   */
+  AsyncTableBuilder<T> setOperationTimeout(long timeout, TimeUnit unit);
+
+  /**
+   * As now we have heartbeat support for scan, ideally a scan will never timeout unless the RS is
+   * crash. The RS will always return something before the rpc timed out or scan timed out to tell
+   * the client that it is still alive. The scan timeout is used as operation timeout for every
+   * operation in a scan, such as openScanner or next.
+   * @see #setScanTimeout(long, TimeUnit)
+   */
+  AsyncTableBuilder<T> setScanTimeout(long timeout, TimeUnit unit);
+
+  /**
+   * Set timeout for each rpc request.
+   * <p>
+   * Notice that this will <strong>NOT</strong> change the rpc timeout for read(get, scan) request
+   * and write request(put, delete).
+   */
+  AsyncTableBuilder<T> setRpcTimeout(long timeout, TimeUnit unit);
+
+  /**
+   * Set timeout for each read(get, scan) rpc request.
+   */
+  AsyncTableBuilder<T> setReadRpcTimeout(long timeout, TimeUnit unit);
+
+  /**
+   * Set timeout for each write(put, delete) rpc request.
+   */
+  AsyncTableBuilder<T> setWriteRpcTimeout(long timeout, TimeUnit unit);
+
+  /**
+   * Set the base pause time for retrying. We use an exponential policy to generate sleep time when
+   * retrying.
+   */
+  AsyncTableBuilder<T> setRetryPause(long pause, TimeUnit unit);
+
+  /**
+   * Set the max retry times for an operation. Usually it is the max attempt times minus 1.
+   * <p>
+   * Operation timeout and max attempt times(or max retry times) are both limitations for retrying,
+   * we will stop retrying when we reach any of the limitations.
+   * @see #setMaxAttempts(int)
+   * @see #setOperationTimeout(long, TimeUnit)
+   */
+  default AsyncTableBuilder<T> setMaxRetries(int maxRetries) {
+    return setMaxAttempts(retries2Attempts(maxRetries));
+  }
+
+  /**
+   * Set the max attempt times for an operation. Usually it is the max retry times plus 1. Operation
+   * timeout and max attempt times(or max retry times) are both limitations for retrying, we will
+   * stop retrying when we reach any of the limitations.
+   * @see #setMaxRetries(int)
+   * @see #setOperationTimeout(long, TimeUnit)
+   */
+  AsyncTableBuilder<T> setMaxAttempts(int maxAttempts);
+
+  /**
+   * Set the number of retries that are allowed before we start to log.
+   */
+  AsyncTableBuilder<T> setStartLogErrorsCnt(int startLogErrorsCnt);
+
+  /**
+   * Create the {@link AsyncTable} or {@link RawAsyncTable} instance.
+   */
+  T build();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
new file mode 100644
index 0000000..766895e
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Base class for all asynchronous table builders.
+ */
+@InterfaceAudience.Private
+abstract class AsyncTableBuilderBase<T extends AsyncTableBase> implements AsyncTableBuilder<T> {
+
+  protected TableName tableName;
+
+  protected long operationTimeoutNs;
+
+  protected long scanTimeoutNs;
+
+  protected long rpcTimeoutNs;
+
+  protected long readRpcTimeoutNs;
+
+  protected long writeRpcTimeoutNs;
+
+  protected long pauseNs;
+
+  protected int maxAttempts;
+
+  protected int startLogErrorsCnt;
+
+  AsyncTableBuilderBase(TableName tableName, AsyncConnectionConfiguration connConf) {
+    this.tableName = tableName;
+    this.operationTimeoutNs = tableName.isSystemTable() ? connConf.getMetaOperationTimeoutNs()
+        : connConf.getOperationTimeoutNs();
+    this.scanTimeoutNs = connConf.getScanTimeoutNs();
+    this.rpcTimeoutNs = connConf.getRpcTimeoutNs();
+    this.readRpcTimeoutNs = connConf.getReadRpcTimeoutNs();
+    this.writeRpcTimeoutNs = connConf.getWriteRpcTimeoutNs();
+    this.pauseNs = connConf.getPauseNs();
+    this.maxAttempts = retries2Attempts(connConf.getMaxRetries());
+    this.startLogErrorsCnt = connConf.getStartLogErrorsCnt();
+  }
+
+  @Override
+  public AsyncTableBuilderBase<T> setOperationTimeout(long timeout, TimeUnit unit) {
+    this.operationTimeoutNs = unit.toNanos(timeout);
+    return this;
+  }
+
+  @Override
+  public AsyncTableBuilderBase<T> setScanTimeout(long timeout, TimeUnit unit) {
+    this.scanTimeoutNs = unit.toNanos(timeout);
+    return this;
+  }
+
+  @Override
+  public AsyncTableBuilderBase<T> setRpcTimeout(long timeout, TimeUnit unit) {
+    this.rpcTimeoutNs = unit.toNanos(timeout);
+    return this;
+  }
+
+  @Override
+  public AsyncTableBuilderBase<T> setReadRpcTimeout(long timeout, TimeUnit unit) {
+    this.readRpcTimeoutNs = unit.toNanos(timeout);
+    return this;
+  }
+
+  @Override
+  public AsyncTableBuilderBase<T> setWriteRpcTimeout(long timeout, TimeUnit unit) {
+    this.writeRpcTimeoutNs = unit.toNanos(timeout);
+    return this;
+  }
+
+  @Override
+  public AsyncTableBuilderBase<T> setRetryPause(long pause, TimeUnit unit) {
+    this.pauseNs = unit.toNanos(pause);
+    return this;
+  }
+
+  @Override
+  public AsyncTableBuilderBase<T> setMaxAttempts(int maxAttempts) {
+    this.maxAttempts = maxAttempts;
+    return this;
+  }
+
+  @Override
+  public AsyncTableBuilderBase<T> setStartLogErrorsCnt(int startLogErrorsCnt) {
+    this.startLogErrorsCnt = startLogErrorsCnt;
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
index 7281185..7cd257c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
@@ -22,7 +22,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
+import static java.util.stream.Collectors.*;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
@@ -42,8 +42,8 @@ class AsyncTableImpl implements AsyncTable {
 
   private final long defaultScannerMaxResultSize;
 
-  public AsyncTableImpl(AsyncConnectionImpl conn, TableName tableName, ExecutorService pool) {
-    this.rawTable = conn.getRawTable(tableName);
+  AsyncTableImpl(AsyncConnectionImpl conn, RawAsyncTable rawTable, ExecutorService pool) {
+    this.rawTable = rawTable;
     this.pool = pool;
     this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
   }
@@ -59,8 +59,8 @@ class AsyncTableImpl implements AsyncTable {
   }
 
   @Override
-  public void setReadRpcTimeout(long timeout, TimeUnit unit) {
-    rawTable.setReadRpcTimeout(timeout, unit);
+  public long getRpcTimeout(TimeUnit unit) {
+    return rawTable.getRpcTimeout(unit);
   }
 
   @Override
@@ -69,31 +69,16 @@ class AsyncTableImpl implements AsyncTable {
   }
 
   @Override
-  public void setWriteRpcTimeout(long timeout, TimeUnit unit) {
-    rawTable.setWriteRpcTimeout(timeout, unit);
-  }
-
-  @Override
   public long getWriteRpcTimeout(TimeUnit unit) {
     return rawTable.getWriteRpcTimeout(unit);
   }
 
   @Override
-  public void setOperationTimeout(long timeout, TimeUnit unit) {
-    rawTable.setOperationTimeout(timeout, unit);
-  }
-
-  @Override
   public long getOperationTimeout(TimeUnit unit) {
     return rawTable.getOperationTimeout(unit);
   }
 
   @Override
-  public void setScanTimeout(long timeout, TimeUnit unit) {
-    rawTable.setScanTimeout(timeout, unit);
-  }
-
-  @Override
   public long getScanTimeout(TimeUnit unit) {
     return rawTable.getScanTimeout(unit);
   }
@@ -194,7 +179,22 @@ class AsyncTableImpl implements AsyncTable {
   }
 
   @Override
+  public List<CompletableFuture<Result>> get(List<Get> gets) {
+    return rawTable.get(gets).stream().map(this::wrap).collect(toList());
+  }
+
+  @Override
+  public List<CompletableFuture<Void>> put(List<Put> puts) {
+    return rawTable.put(puts).stream().map(this::wrap).collect(toList());
+  }
+
+  @Override
+  public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
+    return rawTable.delete(deletes).stream().map(this::wrap).collect(toList());
+  }
+
+  @Override
   public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
-    return rawTable.<T> batch(actions).stream().map(this::wrap).collect(Collectors.toList());
+    return rawTable.<T> batch(actions).stream().map(this::wrap).collect(toList());
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 6f4a844..1abf3f2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -342,16 +342,6 @@ public final class ConnectionUtils {
     return gets.stream().map(ConnectionUtils::toCheckExistenceOnly).collect(toList());
   }
 
-  static List<CompletableFuture<Void>> voidBatch(AsyncTableBase table,
-      List<? extends Row> actions) {
-    return table.<Object> batch(actions).stream().map(f -> f.<Void> thenApply(r -> null))
-        .collect(toList());
-  }
-
-  static CompletableFuture<Void> voidBatchAll(AsyncTableBase table, List<? extends Row> actions) {
-    return table.<Object> batchAll(actions).thenApply(r -> null);
-  }
-
   static RegionLocateType getLocateType(Scan scan) {
     if (scan.isReversed()) {
       if (isEmptyStartRow(scan.getStartRow())) {
@@ -389,4 +379,9 @@ public final class ConnectionUtils {
     // the region.
     return Bytes.compareTo(info.getStartKey(), scan.getStopRow()) <= 0;
   }
+
+  static <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futures) {
+    return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
+        .thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
index 0c292a6..67099e8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 /**
  * A low level asynchronous table.
  * <p>
+ * The implementation is required to be thread safe.
+ * <p>
  * The returned {@code CompletableFuture} will be finished directly in the rpc framework's callback
  * thread, so typically you should not do any time consuming work inside these methods, otherwise
  * you will be likely to block at least one connection to RS(even more if the rpc framework uses

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 347c85b..d9d2d35 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static java.util.stream.Collectors.toList;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
 
 import java.io.IOException;
@@ -67,24 +68,35 @@ class RawAsyncTableImpl implements RawAsyncTable {
 
   private final long defaultScannerMaxResultSize;
 
-  private long readRpcTimeoutNs;
+  private final long rpcTimeoutNs;
 
-  private long writeRpcTimeoutNs;
+  private final long readRpcTimeoutNs;
 
-  private long operationTimeoutNs;
+  private final long writeRpcTimeoutNs;
 
-  private long scanTimeoutNs;
+  private final long operationTimeoutNs;
 
-  public RawAsyncTableImpl(AsyncConnectionImpl conn, TableName tableName) {
+  private final long scanTimeoutNs;
+
+  private final long pauseNs;
+
+  private final int maxAttempts;
+
+  private final int startLogErrorsCnt;
+
+  RawAsyncTableImpl(AsyncConnectionImpl conn, AsyncTableBuilderBase<?> builder) {
     this.conn = conn;
-    this.tableName = tableName;
-    this.readRpcTimeoutNs = conn.connConf.getReadRpcTimeoutNs();
-    this.writeRpcTimeoutNs = conn.connConf.getWriteRpcTimeoutNs();
-    this.operationTimeoutNs = tableName.isSystemTable() ? conn.connConf.getMetaOperationTimeoutNs()
-        : conn.connConf.getOperationTimeoutNs();
+    this.tableName = builder.tableName;
+    this.rpcTimeoutNs = builder.rpcTimeoutNs;
+    this.readRpcTimeoutNs = builder.readRpcTimeoutNs;
+    this.writeRpcTimeoutNs = builder.writeRpcTimeoutNs;
+    this.operationTimeoutNs = builder.operationTimeoutNs;
+    this.scanTimeoutNs = builder.scanTimeoutNs;
+    this.pauseNs = builder.pauseNs;
+    this.maxAttempts = builder.maxAttempts;
+    this.startLogErrorsCnt = builder.startLogErrorsCnt;
     this.defaultScannerCaching = conn.connConf.getScannerCaching();
     this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
-    this.scanTimeoutNs = conn.connConf.getScanTimeoutNs();
   }
 
   @Override
@@ -178,7 +190,9 @@ class RawAsyncTableImpl implements RawAsyncTable {
   private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, long rpcTimeoutNs) {
     return conn.callerFactory.<T> single().table(tableName).row(row)
         .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
-        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS);
+        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+        .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
+        .startLogErrorsCnt(startLogErrorsCnt);
   }
 
   private <T> SingleRequestCallerBuilder<T> newCaller(Row row, long rpcTimeoutNs) {
@@ -214,7 +228,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
   @Override
   public CompletableFuture<Result> append(Append append) {
     checkHasFamilies(append);
-    return this.<Result> newCaller(append, writeRpcTimeoutNs)
+    return this.<Result> newCaller(append, rpcTimeoutNs)
         .action((controller, loc, stub) -> this.<Append, Result> noncedMutate(controller, loc, stub,
           append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
         .call();
@@ -223,7 +237,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
   @Override
   public CompletableFuture<Result> increment(Increment increment) {
     checkHasFamilies(increment);
-    return this.<Result> newCaller(increment, writeRpcTimeoutNs)
+    return this.<Result> newCaller(increment, rpcTimeoutNs)
         .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(controller, loc,
           stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
         .call();
@@ -232,7 +246,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
   @Override
   public CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier,
       CompareOp compareOp, byte[] value, Put put) {
-    return this.<Boolean> newCaller(row, writeRpcTimeoutNs)
+    return this.<Boolean> newCaller(row, rpcTimeoutNs)
         .action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller, loc,
           stub, put,
           (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
@@ -244,7 +258,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
   @Override
   public CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
       CompareOp compareOp, byte[] value, Delete delete) {
-    return this.<Boolean> newCaller(row, writeRpcTimeoutNs)
+    return this.<Boolean> newCaller(row, rpcTimeoutNs)
         .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller,
           loc, stub, delete,
           (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
@@ -303,20 +317,18 @@ class RawAsyncTableImpl implements RawAsyncTable {
           RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm);
           regionMutationBuilder.setAtomic(true);
           return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
-        }, (resp) -> {
-          return null;
-        })).call();
+        }, resp -> null)).call();
   }
 
   @Override
   public CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
       CompareOp compareOp, byte[] value, RowMutations mutation) {
-    return this.<Boolean> newCaller(mutation, writeRpcTimeoutNs)
+    return this.<Boolean> newCaller(mutation, rpcTimeoutNs)
         .action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc,
           stub, mutation,
           (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
             new BinaryComparator(value), CompareType.valueOf(compareOp.name()), rm),
-          (resp) -> resp.getExists()))
+          resp -> resp.getExists()))
         .call();
   }
 
@@ -349,7 +361,8 @@ class RawAsyncTableImpl implements RawAsyncTable {
     }
     return conn.callerFactory.smallScan().table(tableName).setScan(setDefaultScanConfig(scan))
         .limit(limit).scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS)
-        .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call();
+        .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
+        .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call();
   }
 
   public void scan(Scan scan, RawScanResultConsumer consumer) {
@@ -362,55 +375,63 @@ class RawAsyncTableImpl implements RawAsyncTable {
       }
     }
     scan = setDefaultScanConfig(scan);
-    new AsyncClientScanner(scan, consumer, tableName, conn, scanTimeoutNs, readRpcTimeoutNs)
-        .start();
+    new AsyncClientScanner(scan, consumer, tableName, conn, pauseNs, maxAttempts, scanTimeoutNs,
+        readRpcTimeoutNs, startLogErrorsCnt).start();
   }
 
   @Override
-  public void setReadRpcTimeout(long timeout, TimeUnit unit) {
-    this.readRpcTimeoutNs = unit.toNanos(timeout);
+  public List<CompletableFuture<Result>> get(List<Get> gets) {
+    return batch(gets, readRpcTimeoutNs);
   }
 
   @Override
-  public long getReadRpcTimeout(TimeUnit unit) {
-    return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS);
+  public List<CompletableFuture<Void>> put(List<Put> puts) {
+    return voidMutate(puts);
   }
-
   @Override
-  public void setWriteRpcTimeout(long timeout, TimeUnit unit) {
-    this.writeRpcTimeoutNs = unit.toNanos(timeout);
+  public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
+    return voidMutate(deletes);
   }
 
   @Override
-  public long getWriteRpcTimeout(TimeUnit unit) {
-    return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS);
+  public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
+    return batch(actions, rpcTimeoutNs);
+  }
+
+  private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) {
+    return this.<Object> batch(actions, writeRpcTimeoutNs).stream()
+        .map(f -> f.<Void> thenApply(r -> null)).collect(toList());
+  }
+
+  private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long rpcTimeoutNs) {
+    return conn.callerFactory.batch().table(tableName).actions(actions)
+        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
+        .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call();
   }
 
   @Override
-  public void setOperationTimeout(long timeout, TimeUnit unit) {
-    this.operationTimeoutNs = unit.toNanos(timeout);
+  public long getRpcTimeout(TimeUnit unit) {
+    return unit.convert(rpcTimeoutNs, TimeUnit.NANOSECONDS);
   }
 
   @Override
-  public long getOperationTimeout(TimeUnit unit) {
-    return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS);
+  public long getReadRpcTimeout(TimeUnit unit) {
+    return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS);
   }
 
   @Override
-  public void setScanTimeout(long timeout, TimeUnit unit) {
-    this.scanTimeoutNs = unit.toNanos(timeout);
+  public long getWriteRpcTimeout(TimeUnit unit) {
+    return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS);
   }
 
   @Override
-  public long getScanTimeout(TimeUnit unit) {
-    return TimeUnit.NANOSECONDS.convert(scanTimeoutNs, unit);
+  public long getOperationTimeout(TimeUnit unit) {
+    return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS);
   }
 
   @Override
-  public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
-    return conn.callerFactory.batch().table(tableName).actions(actions)
-        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
-        .readRpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS)
-        .writeRpcTimeout(writeRpcTimeoutNs, TimeUnit.NANOSECONDS).call();
+  public long getScanTimeout(TimeUnit unit) {
+    return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 1eec691..609e9a5 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -875,10 +875,7 @@ public final class HConstants {
 
   /**
    * timeout for each RPC
-   * @deprecated Use {@link #HBASE_RPC_READ_TIMEOUT_KEY} or {@link #HBASE_RPC_WRITE_TIMEOUT_KEY}
-   * instead.
    */
-  @Deprecated
   public static final String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout";
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
index 4a391e0..7f54449 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
@@ -30,16 +30,14 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -60,7 +58,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
 
   private static byte[] VALUE = Bytes.toBytes("value");
 
-  private AsyncConnectionImpl asyncConn;
+  private static AsyncConnectionImpl CONN;
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
@@ -68,38 +66,24 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
     TEST_UTIL.getAdmin().setBalancerRunning(false, true);
     TEST_UTIL.createTable(TABLE_NAME, FAMILY);
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
+    CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent());
   }
 
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
+    IOUtils.closeQuietly(CONN);
     TEST_UTIL.shutdownMiniCluster();
   }
 
-  @After
-  public void tearDown() {
-    if (asyncConn != null) {
-      asyncConn.close();
-      asyncConn = null;
-    }
-  }
-
-  private void initConn(int startLogErrorsCnt, long pauseMs, int maxRetires) throws IOException {
-    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
-    conf.setInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, startLogErrorsCnt);
-    conf.setLong(HConstants.HBASE_CLIENT_PAUSE, pauseMs);
-    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, maxRetires);
-    asyncConn = new AsyncConnectionImpl(conf, User.getCurrent());
-  }
-
   @Test
   public void testRegionMove() throws InterruptedException, ExecutionException, IOException {
-    initConn(0, 100, 30);
     // This will leave a cached entry in location cache
-    HRegionLocation loc = asyncConn.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
+    HRegionLocation loc = CONN.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
     int index = TEST_UTIL.getHBaseCluster().getServerWith(loc.getRegionInfo().getRegionName());
     TEST_UTIL.getAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(), Bytes.toBytes(
       TEST_UTIL.getHBaseCluster().getRegionServer(1 - index).getServerName().getServerName()));
-    RawAsyncTable table = asyncConn.getRawTable(TABLE_NAME);
+    RawAsyncTable table = CONN.getRawTableBuilder(TABLE_NAME).setRetryPause(100, TimeUnit.MILLISECONDS)
+        .setMaxRetries(30).build();
     table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
 
     // move back
@@ -117,9 +101,9 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
 
   @Test
   public void testMaxRetries() throws IOException, InterruptedException {
-    initConn(0, 10, 2);
     try {
-      asyncConn.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.DAYS)
+      CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.DAYS)
+          .maxAttempts(3).pause(10, TimeUnit.MILLISECONDS)
           .action((controller, loc, stub) -> failedFuture()).call().get();
       fail();
     } catch (ExecutionException e) {
@@ -129,14 +113,14 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
 
   @Test
   public void testOperationTimeout() throws IOException, InterruptedException {
-    initConn(0, 100, Integer.MAX_VALUE);
     long startNs = System.nanoTime();
     try {
-      asyncConn.callerFactory.single().table(TABLE_NAME).row(ROW)
-          .operationTimeout(1, TimeUnit.SECONDS).action((controller, loc, stub) -> failedFuture())
-          .call().get();
+      CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.SECONDS)
+          .pause(100, TimeUnit.MILLISECONDS).maxAttempts(Integer.MAX_VALUE)
+          .action((controller, loc, stub) -> failedFuture()).call().get();
       fail();
     } catch (ExecutionException e) {
+      e.printStackTrace();
       assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
     }
     long costNs = System.nanoTime() - startNs;
@@ -146,12 +130,11 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
 
   @Test
   public void testLocateError() throws IOException, InterruptedException, ExecutionException {
-    initConn(0, 100, 5);
     AtomicBoolean errorTriggered = new AtomicBoolean(false);
     AtomicInteger count = new AtomicInteger(0);
-    HRegionLocation loc = asyncConn.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
+    HRegionLocation loc = CONN.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
     AsyncRegionLocator mockedLocator =
-        new AsyncRegionLocator(asyncConn, AsyncConnectionImpl.RETRY_TIMER) {
+        new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER) {
           @Override
           CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
               RegionLocateType locateType, long timeoutNs) {
@@ -174,14 +157,15 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
           }
         };
     try (AsyncConnectionImpl mockedConn =
-        new AsyncConnectionImpl(asyncConn.getConfiguration(), User.getCurrent()) {
+        new AsyncConnectionImpl(CONN.getConfiguration(), User.getCurrent()) {
 
           @Override
           AsyncRegionLocator getLocator() {
             return mockedLocator;
           }
         }) {
-      RawAsyncTable table = new RawAsyncTableImpl(mockedConn, TABLE_NAME);
+      RawAsyncTable table = mockedConn.getRawTableBuilder(TABLE_NAME)
+          .setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(5).build();
       table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
       assertTrue(errorTriggered.get());
       errorTriggered.set(false);

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
index 82fe3cd..880114a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
@@ -18,8 +18,6 @@
 package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
-import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
-import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
 import static org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.TABLES_ON_MASTER;
 import static org.junit.Assert.assertEquals;
 
@@ -33,6 +31,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -72,6 +71,8 @@ public class TestAsyncTableGetMultiThreaded {
 
   private static AsyncConnection CONN;
 
+  private static RawAsyncTable TABLE;
+
   private static byte[][] SPLIT_KEYS;
 
   @BeforeClass
@@ -79,14 +80,13 @@ public class TestAsyncTableGetMultiThreaded {
     setUp(HColumnDescriptor.MemoryCompaction.NONE);
   }
 
-  protected static void setUp(HColumnDescriptor.MemoryCompaction memoryCompaction) throws Exception {
+  protected static void setUp(HColumnDescriptor.MemoryCompaction memoryCompaction)
+      throws Exception {
     TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none");
     TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L);
-    TEST_UTIL.getConfiguration().setLong(HBASE_RPC_READ_TIMEOUT_KEY, 1000L);
-    TEST_UTIL.getConfiguration().setInt(HBASE_CLIENT_RETRIES_NUMBER, 1000);
     TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100);
     TEST_UTIL.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
-        String.valueOf(memoryCompaction));
+      String.valueOf(memoryCompaction));
 
     TEST_UTIL.startMiniCluster(5);
     SPLIT_KEYS = new byte[8][];
@@ -96,10 +96,11 @@ public class TestAsyncTableGetMultiThreaded {
     TEST_UTIL.createTable(TABLE_NAME, FAMILY);
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
     CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
-    CONN.getRawTable(TABLE_NAME)
-        .putAll(
-          IntStream.range(0, COUNT).mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i)))
-              .addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))).collect(Collectors.toList()))
+    TABLE = CONN.getRawTableBuilder(TABLE_NAME).setReadRpcTimeout(1, TimeUnit.SECONDS)
+        .setMaxRetries(1000).build();
+    TABLE.putAll(
+      IntStream.range(0, COUNT).mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i)))
+          .addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))).collect(Collectors.toList()))
         .get();
   }
 
@@ -112,11 +113,8 @@ public class TestAsyncTableGetMultiThreaded {
   private void run(AtomicBoolean stop) throws InterruptedException, ExecutionException {
     while (!stop.get()) {
       for (int i = 0; i < COUNT; i++) {
-        assertEquals(i,
-            Bytes.toInt(
-                CONN.getRawTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i))))
-                    .get()
-                    .getValue(FAMILY, QUALIFIER)));
+        assertEquals(i, Bytes.toInt(TABLE.get(new Get(Bytes.toBytes(String.format("%03d", i))))
+            .get().getValue(FAMILY, QUALIFIER)));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
index 270e3e1..9f3970b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
@@ -25,7 +25,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Queue;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
 import org.apache.hadoop.hbase.testclassification.ClientTests;
@@ -122,10 +121,7 @@ public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan {
   @Override
   protected List<Result> doScan(Scan scan) throws Exception {
     SimpleRawScanResultConsumer scanConsumer = new SimpleRawScanResultConsumer();
-    RawAsyncTable table = ASYNC_CONN.getRawTable(TABLE_NAME);
-    table.setScanTimeout(1, TimeUnit.HOURS);
-    table.setReadRpcTimeout(1, TimeUnit.HOURS);
-    table.scan(scan, scanConsumer);
+    ASYNC_CONN.getRawTable(TABLE_NAME).scan(scan, scanConsumer);
     List<Result> results = new ArrayList<>();
     for (Result result; (result = scanConsumer.take()) != null;) {
       results.add(result);