You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/11/16 06:37:08 UTC

[3/3] hbase git commit: HBASE-19251 Merge RawAsyncTable and AsyncTable

HBASE-19251 Merge RawAsyncTable and AsyncTable


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/54827cf6
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/54827cf6
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/54827cf6

Branch: refs/heads/master
Commit: 54827cf6139277c8f7c5cfd6833cd4c33a08e9b1
Parents: 3a46550
Author: zhangduo <zh...@apache.org>
Authored: Thu Nov 16 14:36:28 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu Nov 16 14:36:28 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/AsyncMetaTableAccessor.java    |  35 +-
 .../client/AdvancedScanResultConsumer.java      | 121 ++++
 .../apache/hadoop/hbase/client/AsyncAdmin.java  |  17 +-
 .../client/AsyncBufferedMutatorBuilderImpl.java |   4 +-
 .../hbase/client/AsyncBufferedMutatorImpl.java  |   6 +-
 .../hadoop/hbase/client/AsyncClientScanner.java |   4 +-
 .../hadoop/hbase/client/AsyncConnection.java    |  29 +-
 .../hbase/client/AsyncConnectionImpl.java       |  90 +--
 .../hadoop/hbase/client/AsyncHBaseAdmin.java    |   7 +-
 .../hbase/client/AsyncNonMetaRegionLocator.java |  85 ++-
 .../client/AsyncRpcRetryingCallerFactory.java   |   4 +-
 .../AsyncScanSingleRegionRpcRetryingCaller.java |  38 +-
 .../AsyncSingleRequestRpcRetryingCaller.java    |   4 +-
 .../apache/hadoop/hbase/client/AsyncTable.java  | 570 ++++++++++++++++++-
 .../hadoop/hbase/client/AsyncTableBase.java     | 414 --------------
 .../hadoop/hbase/client/AsyncTableBuilder.java  |  26 +-
 .../hbase/client/AsyncTableBuilderBase.java     |  21 +-
 .../hadoop/hbase/client/AsyncTableImpl.java     |  83 ++-
 .../hbase/client/AsyncTableResultScanner.java   |  11 +-
 .../hadoop/hbase/client/ConnectionUtils.java    |  23 +-
 .../hadoop/hbase/client/RawAsyncHBaseAdmin.java |  26 +-
 .../hadoop/hbase/client/RawAsyncTable.java      | 263 ---------
 .../hadoop/hbase/client/RawAsyncTableImpl.java  |  69 ++-
 .../hbase/client/RawScanResultConsumer.java     | 137 -----
 .../hadoop/hbase/client/ScanResultConsumer.java |  28 +-
 .../hbase/client/ScanResultConsumerBase.java    |  48 ++
 .../hadoop/hbase/client/ServiceCaller.java      |  61 ++
 .../coprocessor/AsyncAggregationClient.java     |  31 +-
 .../client/TestAsyncAggregationClient.java      |   4 +-
 .../client/example/AsyncClientExample.java      |   2 +-
 .../hbase/client/example/HttpProxyExample.java  |  43 +-
 .../hadoop/hbase/PerformanceEvaluation.java     |  20 +-
 .../client/AbstractTestAsyncTableScan.java      |  55 +-
 .../client/BufferingScanResultConsumer.java     |  89 +++
 .../client/SimpleRawScanResultConsumer.java     |  84 ---
 .../hbase/client/TestAsyncBufferMutator.java    |   2 +-
 .../hbase/client/TestAsyncClusterAdminApi.java  |   4 +-
 .../hbase/client/TestAsyncRegionAdminApi.java   |  36 +-
 ...TestAsyncSingleRequestRpcRetryingCaller.java |  12 +-
 .../hadoop/hbase/client/TestAsyncTable.java     |  24 +-
 .../hbase/client/TestAsyncTableAdminApi.java    | 128 ++---
 .../hbase/client/TestAsyncTableBatch.java       |  68 +--
 .../client/TestAsyncTableGetMultiThreaded.java  |  18 +-
 .../hbase/client/TestAsyncTableNoncedRetry.java |   4 +-
 .../hadoop/hbase/client/TestAsyncTableScan.java |   7 +-
 .../hbase/client/TestAsyncTableScanAll.java     |  20 +-
 .../hbase/client/TestAsyncTableScanMetrics.java |   4 +-
 .../client/TestAsyncTableScanRenewLease.java    |   6 +-
 .../hbase/client/TestAsyncTableScanner.java     |  20 +-
 ...stAsyncTableScannerCloseWhileSuspending.java |   2 +-
 .../hbase/client/TestRawAsyncScanCursor.java    |   8 +-
 .../TestRawAsyncTableLimitedScanWithFilter.java |   4 +-
 .../client/TestRawAsyncTablePartialScan.java    |   8 +-
 .../hbase/client/TestRawAsyncTableScan.java     |   8 +-
 54 files changed, 1480 insertions(+), 1455 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java
index 6f41bd0..4c1d602 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java
@@ -38,10 +38,10 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.MetaTableAccessor.CollectingVisitor;
 import org.apache.hadoop.hbase.MetaTableAccessor.QueryType;
 import org.apache.hadoop.hbase.MetaTableAccessor.Visitor;
+import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
+import org.apache.hadoop.hbase.client.AsyncTable;
 import org.apache.hadoop.hbase.client.Consistency;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.RawAsyncTable;
-import org.apache.hadoop.hbase.client.RawScanResultConsumer;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.Result;
@@ -72,14 +72,15 @@ public class AsyncMetaTableAccessor {
   private static final Pattern SERVER_COLUMN_PATTERN = Pattern
       .compile("^server(_[0-9a-fA-F]{4})?$");
 
-  public static CompletableFuture<Boolean> tableExists(RawAsyncTable metaTable, TableName tableName) {
+  public static CompletableFuture<Boolean> tableExists(AsyncTable<?> metaTable,
+      TableName tableName) {
     if (tableName.equals(META_TABLE_NAME)) {
       return CompletableFuture.completedFuture(true);
     }
     return getTableState(metaTable, tableName).thenApply(Optional::isPresent);
   }
 
-  public static CompletableFuture<Optional<TableState>> getTableState(RawAsyncTable metaTable,
+  public static CompletableFuture<Optional<TableState>> getTableState(AsyncTable<?> metaTable,
       TableName tableName) {
     CompletableFuture<Optional<TableState>> future = new CompletableFuture<>();
     Get get = new Get(tableName.getName()).addColumn(getTableFamily(), getStateColumn());
@@ -110,7 +111,7 @@ public class AsyncMetaTableAccessor {
    * @return HRegionLocation for the given region
    */
   public static CompletableFuture<Optional<HRegionLocation>> getRegionLocation(
-      RawAsyncTable metaTable, byte[] regionName) {
+      AsyncTable<?> metaTable, byte[] regionName) {
     CompletableFuture<Optional<HRegionLocation>> future = new CompletableFuture<>();
     try {
       RegionInfo parsedRegionInfo = MetaTableAccessor.parseRegionInfoFromRegionName(regionName);
@@ -139,7 +140,7 @@ public class AsyncMetaTableAccessor {
    * @return HRegionLocation for the given region
    */
   public static CompletableFuture<Optional<HRegionLocation>> getRegionLocationWithEncodedName(
-      RawAsyncTable metaTable, byte[] encodedRegionName) {
+      AsyncTable<?> metaTable, byte[] encodedRegionName) {
     CompletableFuture<Optional<HRegionLocation>> future = new CompletableFuture<>();
     metaTable.scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY))
         .whenComplete(
@@ -193,7 +194,7 @@ public class AsyncMetaTableAccessor {
    *         {@link CompletableFuture}.
    */
   public static CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(
-      RawAsyncTable metaTable, final Optional<TableName> tableName) {
+      AsyncTable<AdvancedScanResultConsumer> metaTable, Optional<TableName> tableName) {
     CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
     getTableRegionsAndLocations(metaTable, tableName, true).whenComplete(
       (locations, err) -> {
@@ -220,7 +221,7 @@ public class AsyncMetaTableAccessor {
    *         {@link CompletableFuture}.
    */
   private static CompletableFuture<List<Pair<RegionInfo, ServerName>>> getTableRegionsAndLocations(
-      RawAsyncTable metaTable, final Optional<TableName> tableName,
+      AsyncTable<AdvancedScanResultConsumer> metaTable, final Optional<TableName> tableName,
       final boolean excludeOfflinedSplitParents) {
     CompletableFuture<List<Pair<RegionInfo, ServerName>>> future = new CompletableFuture<>();
     if (tableName.filter((t) -> t.equals(TableName.META_TABLE_NAME)).isPresent()) {
@@ -252,7 +253,7 @@ public class AsyncMetaTableAccessor {
         }
         for (HRegionLocation loc : current.get().getRegionLocations()) {
           if (loc != null) {
-            this.results.add(new Pair<RegionInfo, ServerName>(loc.getRegionInfo(), loc
+            this.results.add(new Pair<RegionInfo, ServerName>(loc.getRegion(), loc
                 .getServerName()));
           }
         }
@@ -276,7 +277,7 @@ public class AsyncMetaTableAccessor {
    * @param type scanned part of meta
    * @param visitor Visitor invoked against each row
    */
-  private static CompletableFuture<Void> scanMeta(RawAsyncTable metaTable,
+  private static CompletableFuture<Void> scanMeta(AsyncTable<AdvancedScanResultConsumer> metaTable,
       Optional<TableName> tableName, QueryType type, final Visitor visitor) {
     return scanMeta(metaTable, getTableStartRowForMeta(tableName, type),
       getTableStopRowForMeta(tableName, type), type, Integer.MAX_VALUE, visitor);
@@ -291,8 +292,9 @@ public class AsyncMetaTableAccessor {
    * @param maxRows maximum rows to return
    * @param visitor Visitor invoked against each row
    */
-  private static CompletableFuture<Void> scanMeta(RawAsyncTable metaTable, Optional<byte[]> startRow,
-      Optional<byte[]> stopRow, QueryType type, int maxRows, final Visitor visitor) {
+  private static CompletableFuture<Void> scanMeta(AsyncTable<AdvancedScanResultConsumer> metaTable,
+      Optional<byte[]> startRow, Optional<byte[]> stopRow, QueryType type, int maxRows,
+      final Visitor visitor) {
     int rowUpperLimit = maxRows > 0 ? maxRows : Integer.MAX_VALUE;
     Scan scan = getMetaScan(metaTable, rowUpperLimit);
     for (byte[] family : type.getFamilies()) {
@@ -308,11 +310,11 @@ public class AsyncMetaTableAccessor {
     }
 
     CompletableFuture<Void> future = new CompletableFuture<Void>();
-    metaTable.scan(scan, new MetaTableRawScanResultConsumer(rowUpperLimit, visitor, future));
+    metaTable.scan(scan, new MetaTableScanResultConsumer(rowUpperLimit, visitor, future));
     return future;
   }
 
-  private static final class MetaTableRawScanResultConsumer implements RawScanResultConsumer {
+  private static final class MetaTableScanResultConsumer implements AdvancedScanResultConsumer {
 
     private int currentRowCount;
 
@@ -322,7 +324,8 @@ public class AsyncMetaTableAccessor {
 
     private final CompletableFuture<Void> future;
 
-    MetaTableRawScanResultConsumer(int rowUpperLimit, Visitor visitor, CompletableFuture<Void> future) {
+    MetaTableScanResultConsumer(int rowUpperLimit, Visitor visitor,
+        CompletableFuture<Void> future) {
       this.rowUpperLimit = rowUpperLimit;
       this.visitor = visitor;
       this.future = future;
@@ -359,7 +362,7 @@ public class AsyncMetaTableAccessor {
     }
   }
 
-  private static Scan getMetaScan(RawAsyncTable metaTable, int rowUpperLimit) {
+  private static Scan getMetaScan(AsyncTable<?> metaTable, int rowUpperLimit) {
     Scan scan = new Scan();
     int scannerCaching = metaTable.getConfiguration().getInt(HConstants.HBASE_META_SCANNER_CACHING,
       HConstants.DEFAULT_HBASE_META_SCANNER_CACHING);

http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdvancedScanResultConsumer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdvancedScanResultConsumer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdvancedScanResultConsumer.java
new file mode 100644
index 0000000..10933ab
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdvancedScanResultConsumer.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.util.Optional;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * This is the low level API for asynchronous scan.
+ * <p>
+ * All results that match the given scan object will be passed to this class by calling
+ * {@link #onNext(Result[], ScanController)}. {@link #onComplete()} means the scan is finished, and
+ * {@link #onError(Throwable)} means we hit an unrecoverable error and the scan is terminated.
+ * {@link #onHeartbeat(ScanController)} means the RS is still working but we can not get a valid
+ * result to call {@link #onNext(Result[], ScanController)}. This is usually because the matched
+ * results are too sparse, for example, a filter which almost filters out everything is specified.
+ * <p>
+ * Notice that, all the methods here will be called directly in the thread which we send request to
+ * HBase service. So if you want the asynchronous scanner fetch data from HBase in background while
+ * you process the returned data, you need to move the processing work to another thread to make the
+ * {@link #onNext(Result[], ScanController)} call return immediately. And please do NOT do any time
+ * consuming tasks in these methods unless you know what you are doing.
+ * @since 2.0.0
+ */
+@InterfaceAudience.Public
+public interface AdvancedScanResultConsumer extends ScanResultConsumerBase {
+
+  /**
+   * Used to resume a scan.
+   */
+  @InterfaceAudience.Public
+  interface ScanResumer {
+
+    /**
+     * Resume the scan. You are free to call it multiple time but only the first call will take
+     * effect.
+     */
+    void resume();
+  }
+
+  /**
+   * Used to suspend or stop a scan, or get a scan cursor if available.
+   * <p>
+   * Notice that, you should only call the {@link #suspend()} or {@link #terminate()} inside onNext
+   * or onHeartbeat method. A IllegalStateException will be thrown if you call them at other places.
+   * <p>
+   * You can only call one of the {@link #suspend()} and {@link #terminate()} methods(of course you
+   * are free to not call them both), and the methods are not reentrant. An IllegalStateException
+   * will be thrown if you have already called one of the methods.
+   */
+  @InterfaceAudience.Public
+  interface ScanController {
+
+    /**
+     * Suspend the scan.
+     * <p>
+     * This means we will stop fetching data in background, i.e., will not call onNext any more
+     * before you resume the scan.
+     * @return A resumer used to resume the scan later.
+     */
+    ScanResumer suspend();
+
+    /**
+     * Terminate the scan.
+     * <p>
+     * This is useful when you have got enough results and want to stop the scan in onNext method,
+     * or you want to stop the scan in onHeartbeat method because it has spent too many time.
+     */
+    void terminate();
+
+    /**
+     * Get the scan cursor if available.
+     * @return The scan cursor.
+     */
+    Optional<Cursor> cursor();
+  }
+
+  /**
+   * Indicate that we have receive some data.
+   * @param results the data fetched from HBase service.
+   * @param controller used to suspend or terminate the scan. Notice that the {@code controller}
+   *          instance is only valid within scope of onNext method. You can only call its method in
+   *          onNext, do NOT store it and call it later outside onNext.
+   */
+  void onNext(Result[] results, ScanController controller);
+
+  /**
+   * Indicate that there is a heartbeat message but we have not cumulated enough cells to call
+   * {@link #onNext(Result[], ScanController)}.
+   * <p>
+   * Note that this method will always be called when RS returns something to us but we do not have
+   * enough cells to call {@link #onNext(Result[], ScanController)}. Sometimes it may not be a
+   * 'heartbeat' message for RS, for example, we have a large row with many cells and size limit is
+   * exceeded before sending all the cells for this row. For RS it does send some data to us and the
+   * time limit has not been reached, but we can not return the data to client so here we call this
+   * method to tell client we have already received something.
+   * <p>
+   * This method give you a chance to terminate a slow scan operation.
+   * @param controller used to suspend or terminate the scan. Notice that the {@code controller}
+   *          instance is only valid within the scope of onHeartbeat method. You can only call its
+   *          method in onHeartbeat, do NOT store it and call it later outside onHeartbeat.
+   */
+  default void onHeartbeat(ScanController controller) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
index 722e8b5..c716441 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import com.google.protobuf.RpcChannel;
+
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.List;
@@ -33,7 +35,6 @@ import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.RegionLoad;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallable;
 import org.apache.hadoop.hbase.client.replication.TableCFs;
 import org.apache.hadoop.hbase.client.security.SecurityCapability;
 import org.apache.hadoop.hbase.quotas.QuotaFilter;
@@ -42,8 +43,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.yetus.audience.InterfaceAudience;
 
-import com.google.protobuf.RpcChannel;
-
 /**
  * The asynchronous administrative API for HBase.
  * @since 2.0.0
@@ -1072,14 +1071,14 @@ public interface AsyncAdmin {
    * </pre>
    * @param stubMaker a delegation to the actual {@code newStub} call.
    * @param callable a delegation to the actual protobuf rpc call. See the comment of
-   *          {@link CoprocessorCallable} for more details.
+   *          {@link ServiceCaller} for more details.
    * @param <S> the type of the asynchronous stub
    * @param <R> the type of the return value
    * @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}.
-   * @see CoprocessorCallable
+   * @see ServiceCaller
    */
   <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
-      CoprocessorCallable<S, R> callable);
+      ServiceCaller<S, R> callable);
 
   /**
    * Execute the given coprocessor call on the given region server.
@@ -1094,15 +1093,15 @@ public interface AsyncAdmin {
    * </pre>
    * @param stubMaker a delegation to the actual {@code newStub} call.
    * @param callable a delegation to the actual protobuf rpc call. See the comment of
-   *          {@link CoprocessorCallable} for more details.
+   *          {@link ServiceCaller} for more details.
    * @param serverName the given region server
    * @param <S> the type of the asynchronous stub
    * @param <R> the type of the return value
    * @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}.
-   * @see CoprocessorCallable
+   * @see ServiceCaller
    */
   <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
-    CoprocessorCallable<S, R> callable, ServerName serverName);
+    ServiceCaller<S, R> callable, ServerName serverName);
 
   /**
    * List all the dead region servers.

http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
index 1b8765c..a44bafa 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
@@ -29,12 +29,12 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 class AsyncBufferedMutatorBuilderImpl implements AsyncBufferedMutatorBuilder {
 
-  private final AsyncTableBuilder<? extends AsyncTableBase> tableBuilder;
+  private final AsyncTableBuilder<?> tableBuilder;
 
   private long writeBufferSize;
 
   public AsyncBufferedMutatorBuilderImpl(AsyncConnectionConfiguration connConf,
-      AsyncTableBuilder<? extends AsyncTableBase> tableBuilder) {
+      AsyncTableBuilder<?> tableBuilder) {
     this.tableBuilder = tableBuilder;
     this.writeBufferSize = connConf.getWriteBufferSize();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
index f6f1ed6..ac159b4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
@@ -30,12 +30,12 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
- * The implementation of {@link AsyncBufferedMutator}. Simply wrap an {@link AsyncTableBase}.
+ * The implementation of {@link AsyncBufferedMutator}. Simply wrap an {@link AsyncTable}.
  */
 @InterfaceAudience.Private
 class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
 
-  private final AsyncTableBase table;
+  private final AsyncTable<?> table;
 
   private final long writeBufferSize;
 
@@ -47,7 +47,7 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
 
   private boolean closed;
 
-  AsyncBufferedMutatorImpl(AsyncTableBase table, long writeBufferSize) {
+  AsyncBufferedMutatorImpl(AsyncTable<?> table, long writeBufferSize) {
     this.table = table;
     this.writeBufferSize = writeBufferSize;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
index 5268ec8..ac2d3d7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
@@ -53,7 +53,7 @@ class AsyncClientScanner {
 
   private final ScanMetrics scanMetrics;
 
-  private final RawScanResultConsumer consumer;
+  private final AdvancedScanResultConsumer consumer;
 
   private final TableName tableName;
 
@@ -71,7 +71,7 @@ class AsyncClientScanner {
 
   private final ScanResultCache resultCache;
 
-  public AsyncClientScanner(Scan scan, RawScanResultConsumer consumer, TableName tableName,
+  public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
       AsyncConnectionImpl conn, long pauseNs, int maxAttempts, long scanTimeoutNs,
       long rpcTimeoutNs, int startLogErrorsCnt) {
     if (scan.getStartRow() == null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
index 877c074..eda2394 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
@@ -49,32 +49,37 @@ public interface AsyncConnection extends Closeable {
   AsyncTableRegionLocator getRegionLocator(TableName tableName);
 
   /**
-   * Retrieve an {@link RawAsyncTable} implementation for accessing a table.
+   * Retrieve an {@link AsyncTable} implementation for accessing a table.
    * <p>
-   * The returned instance will use default configs. Use {@link #getRawTableBuilder(TableName)} if you
-   * want to customize some configs.
+   * The returned instance will use default configs. Use {@link #getTableBuilder(TableName)} if
+   * you want to customize some configs.
    * <p>
    * This method no longer checks table existence. An exception will be thrown if the table does not
    * exist only when the first operation is attempted.
+   * <p>
+   * The returned {@code CompletableFuture} will be finished directly in the rpc framework's
+   * callback thread, so typically you should not do any time consuming work inside these methods.
+   * And also the observer style scan API will use {@link AdvancedScanResultConsumer} which is
+   * designed for experts only. Only use it when you know what you are doing.
    * @param tableName the name of the table
-   * @return an RawAsyncTable to use for interactions with this table
-   * @see #getRawTableBuilder(TableName)
+   * @return an AsyncTable to use for interactions with this table
+   * @see #getTableBuilder(TableName)
    */
-  default RawAsyncTable getRawTable(TableName tableName) {
-    return getRawTableBuilder(tableName).build();
+  default AsyncTable<AdvancedScanResultConsumer> getTable(TableName tableName) {
+    return getTableBuilder(tableName).build();
   }
 
   /**
-   * Returns an {@link AsyncTableBuilder} for creating {@link RawAsyncTable}.
+   * Returns an {@link AsyncTableBuilder} for creating {@link AsyncTable}.
    * <p>
    * This method no longer checks table existence. An exception will be thrown if the table does not
    * exist only when the first operation is attempted.
    * @param tableName the name of the table
    */
-  AsyncTableBuilder<RawAsyncTable> getRawTableBuilder(TableName tableName);
+  AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName tableName);
 
   /**
-   * Retrieve an AsyncTable implementation for accessing a table.
+   * Retrieve an {@link AsyncTable} implementation for accessing a table.
    * <p>
    * This method no longer checks table existence. An exception will be thrown if the table does not
    * exist only when the first operation is attempted.
@@ -82,7 +87,7 @@ public interface AsyncConnection extends Closeable {
    * @param pool the thread pool to use for executing callback
    * @return an AsyncTable to use for interactions with this table
    */
-  default AsyncTable getTable(TableName tableName, ExecutorService pool) {
+  default AsyncTable<ScanResultConsumer> getTable(TableName tableName, ExecutorService pool) {
     return getTableBuilder(tableName, pool).build();
   }
 
@@ -94,7 +99,7 @@ public interface AsyncConnection extends Closeable {
    * @param tableName the name of the table
    * @param pool the thread pool to use for executing callback
    */
-  AsyncTableBuilder<AsyncTable> getTableBuilder(TableName tableName, ExecutorService pool);
+  AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName, ExecutorService pool);
 
   /**
    * Retrieve an {@link AsyncAdmin} implementation to administer an HBase cluster.

http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index d5df785..f9f9659 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -97,7 +97,7 @@ class AsyncConnectionImpl implements AsyncConnection {
   private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>();
 
   private final AtomicReference<CompletableFuture<MasterService.Interface>> masterStubMakeFuture =
-      new AtomicReference<>();
+    new AtomicReference<>();
 
   public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId,
       User user) {
@@ -108,8 +108,8 @@ class AsyncConnectionImpl implements AsyncConnection {
     this.rpcClient = RpcClientFactory.createClient(conf, clusterId);
     this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
     this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
-    this.rpcTimeout = (int) Math.min(Integer.MAX_VALUE,
-      TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs()));
+    this.rpcTimeout =
+      (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs()));
     this.locator = new AsyncRegionLocator(this, RETRY_TIMER);
     this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER);
     if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
@@ -161,7 +161,7 @@ class AsyncConnectionImpl implements AsyncConnection {
     return MasterService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
   }
 
-  private AdminService.Interface createAdminServerStub(ServerName serverName) throws IOException{
+  private AdminService.Interface createAdminServerStub(ServerName serverName) throws IOException {
     return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
   }
 
@@ -172,38 +172,37 @@ class AsyncConnectionImpl implements AsyncConnection {
   }
 
   private void makeMasterStub(CompletableFuture<MasterService.Interface> future) {
-    registry.getMasterAddress().whenComplete(
-      (sn, error) -> {
-        if (sn == null) {
-          String msg = "ZooKeeper available but no active master location found";
-          LOG.info(msg);
-          this.masterStubMakeFuture.getAndSet(null).completeExceptionally(
-            new MasterNotRunningException(msg));
-          return;
-        }
-        try {
-          MasterService.Interface stub = createMasterStub(sn);
-          HBaseRpcController controller = getRpcController();
-          stub.isMasterRunning(controller, RequestConverter.buildIsMasterRunningRequest(),
-            new RpcCallback<IsMasterRunningResponse>() {
-              @Override
-              public void run(IsMasterRunningResponse resp) {
-                if (controller.failed() || resp == null
-                    || (resp != null && !resp.getIsMasterRunning())) {
-                  masterStubMakeFuture.getAndSet(null).completeExceptionally(
-                    new MasterNotRunningException("Master connection is not running anymore"));
-                } else {
-                  masterStub.set(stub);
-                  masterStubMakeFuture.set(null);
-                  future.complete(stub);
-                }
+    registry.getMasterAddress().whenComplete((sn, error) -> {
+      if (sn == null) {
+        String msg = "ZooKeeper available but no active master location found";
+        LOG.info(msg);
+        this.masterStubMakeFuture.getAndSet(null)
+            .completeExceptionally(new MasterNotRunningException(msg));
+        return;
+      }
+      try {
+        MasterService.Interface stub = createMasterStub(sn);
+        HBaseRpcController controller = getRpcController();
+        stub.isMasterRunning(controller, RequestConverter.buildIsMasterRunningRequest(),
+          new RpcCallback<IsMasterRunningResponse>() {
+            @Override
+            public void run(IsMasterRunningResponse resp) {
+              if (controller.failed() || resp == null ||
+                (resp != null && !resp.getIsMasterRunning())) {
+                masterStubMakeFuture.getAndSet(null).completeExceptionally(
+                  new MasterNotRunningException("Master connection is not running anymore"));
+              } else {
+                masterStub.set(stub);
+                masterStubMakeFuture.set(null);
+                future.complete(stub);
               }
-            });
-        } catch (IOException e) {
-          this.masterStubMakeFuture.getAndSet(null).completeExceptionally(
-            new IOException("Failed to create async master stub", e));
-        }
-      });
+            }
+          });
+      } catch (IOException e) {
+        this.masterStubMakeFuture.getAndSet(null)
+            .completeExceptionally(new IOException("Failed to create async master stub", e));
+      }
+    });
   }
 
   CompletableFuture<MasterService.Interface> getMasterStub() {
@@ -231,8 +230,8 @@ class AsyncConnectionImpl implements AsyncConnection {
           new RpcCallback<IsMasterRunningResponse>() {
             @Override
             public void run(IsMasterRunningResponse resp) {
-              if (controller.failed() || resp == null
-                  || (resp != null && !resp.getIsMasterRunning())) {
+              if (controller.failed() || resp == null ||
+                (resp != null && !resp.getIsMasterRunning())) {
                 makeMasterStub(future);
               } else {
                 future.complete(masterStub);
@@ -255,22 +254,23 @@ class AsyncConnectionImpl implements AsyncConnection {
   }
 
   @Override
-  public AsyncTableBuilder<RawAsyncTable> getRawTableBuilder(TableName tableName) {
-    return new AsyncTableBuilderBase<RawAsyncTable>(tableName, connConf) {
+  public AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName tableName) {
+    return new AsyncTableBuilderBase<AdvancedScanResultConsumer>(tableName, connConf) {
 
       @Override
-      public RawAsyncTable build() {
+      public AsyncTable<AdvancedScanResultConsumer> build() {
         return new RawAsyncTableImpl(AsyncConnectionImpl.this, this);
       }
     };
   }
 
   @Override
-  public AsyncTableBuilder<AsyncTable> getTableBuilder(TableName tableName, ExecutorService pool) {
-    return new AsyncTableBuilderBase<AsyncTable>(tableName, connConf) {
+  public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName,
+      ExecutorService pool) {
+    return new AsyncTableBuilderBase<ScanResultConsumer>(tableName, connConf) {
 
       @Override
-      public AsyncTable build() {
+      public AsyncTable<ScanResultConsumer> build() {
         RawAsyncTableImpl rawTable = new RawAsyncTableImpl(AsyncConnectionImpl.this, this);
         return new AsyncTableImpl(AsyncConnectionImpl.this, rawTable, pool);
       }
@@ -293,7 +293,7 @@ class AsyncConnectionImpl implements AsyncConnection {
       @Override
       public AsyncAdmin build() {
         RawAsyncHBaseAdmin rawAdmin =
-            new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this);
+          new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this);
         return new AsyncHBaseAdmin(rawAdmin, pool);
       }
     };
@@ -301,7 +301,7 @@ class AsyncConnectionImpl implements AsyncConnection {
 
   @Override
   public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) {
-    return new AsyncBufferedMutatorBuilderImpl(connConf, getRawTableBuilder(tableName));
+    return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index 5a20291..ab529a8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.RegionLoad;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallable;
 import org.apache.hadoop.hbase.client.replication.TableCFs;
 import org.apache.hadoop.hbase.client.security.SecurityCapability;
 import org.apache.hadoop.hbase.quotas.QuotaFilter;
@@ -56,7 +55,7 @@ import org.apache.yetus.audience.InterfaceAudience;
  * @see AsyncConnection#getAdminBuilder(ExecutorService)
  */
 @InterfaceAudience.Private
-public class AsyncHBaseAdmin implements AsyncAdmin {
+class AsyncHBaseAdmin implements AsyncAdmin {
 
   private final RawAsyncHBaseAdmin rawAdmin;
 
@@ -705,13 +704,13 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
 
   @Override
   public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
-      CoprocessorCallable<S, R> callable) {
+      ServiceCaller<S, R> callable) {
     return wrap(rawAdmin.coprocessorService(stubMaker, callable));
   }
 
   @Override
   public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
-      CoprocessorCallable<S, R> callable, ServerName serverName) {
+      ServiceCaller<S, R> callable, ServerName serverName) {
     return wrap(rawAdmin.coprocessorService(stubMaker, callable, serverName));
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
index 5bead20..2adafb6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
@@ -20,10 +20,10 @@ package org.apache.hadoop.hbase.client;
 import static org.apache.hadoop.hbase.HConstants.CATALOG_FAMILY;
 import static org.apache.hadoop.hbase.HConstants.NINES;
 import static org.apache.hadoop.hbase.HConstants.ZEROES;
-import static org.apache.hadoop.hbase.HRegionInfo.createRegionName;
 import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName;
 import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
 import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
 
@@ -45,14 +45,13 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * The asynchronous locator for regions other than meta.
@@ -63,7 +62,7 @@ class AsyncNonMetaRegionLocator {
   private static final Log LOG = LogFactory.getLog(AsyncNonMetaRegionLocator.class);
 
   static final String MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE =
-      "hbase.client.meta.max.concurrent.locate.per.table";
+    "hbase.client.meta.max.concurrent.locate.per.table";
 
   private static final int DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 8;
 
@@ -102,12 +101,12 @@ class AsyncNonMetaRegionLocator {
   private static final class TableCache {
 
     public final ConcurrentNavigableMap<byte[], HRegionLocation> cache =
-        new ConcurrentSkipListMap<>(BYTES_COMPARATOR);
+      new ConcurrentSkipListMap<>(BYTES_COMPARATOR);
 
     public final Set<LocateRequest> pendingRequests = new HashSet<>();
 
     public final Map<LocateRequest, CompletableFuture<HRegionLocation>> allRequests =
-        new LinkedHashMap<>();
+      new LinkedHashMap<>();
 
     public boolean hasQuota(int max) {
       return pendingRequests.size() < max;
@@ -126,8 +125,8 @@ class AsyncNonMetaRegionLocator {
     }
 
     public void clearCompletedRequests(Optional<HRegionLocation> location) {
-      for (Iterator<Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>>> iter = allRequests
-          .entrySet().iterator(); iter.hasNext();) {
+      for (Iterator<Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>>> iter =
+        allRequests.entrySet().iterator(); iter.hasNext();) {
         Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>> entry = iter.next();
         if (tryComplete(entry.getKey(), entry.getValue(), location)) {
           iter.remove();
@@ -146,15 +145,16 @@ class AsyncNonMetaRegionLocator {
       HRegionLocation loc = location.get();
       boolean completed;
       if (req.locateType.equals(RegionLocateType.BEFORE)) {
-        // for locating the row before current row, the common case is to find the previous region in
-        // reverse scan, so we check the endKey first. In general, the condition should be startKey <
-        // req.row and endKey >= req.row. Here we split it to endKey == req.row || (endKey > req.row
-        // && startKey < req.row). The two conditions are equal since startKey < endKey.
-        int c = Bytes.compareTo(loc.getRegionInfo().getEndKey(), req.row);
+        // for locating the row before current row, the common case is to find the previous region
+        // in reverse scan, so we check the endKey first. In general, the condition should be
+        // startKey < req.row and endKey >= req.row. Here we split it to endKey == req.row ||
+        // (endKey > req.row && startKey < req.row). The two conditions are equal since startKey <
+        // endKey.
+        int c = Bytes.compareTo(loc.getRegion().getEndKey(), req.row);
         completed =
-            c == 0 || (c > 0 && Bytes.compareTo(loc.getRegionInfo().getStartKey(), req.row) < 0);
+          c == 0 || (c > 0 && Bytes.compareTo(loc.getRegion().getStartKey(), req.row) < 0);
       } else {
-        completed = loc.getRegionInfo().containsRow(req.row);
+        completed = loc.getRegion().containsRow(req.row);
       }
       if (completed) {
         future.complete(loc);
@@ -176,13 +176,13 @@ class AsyncNonMetaRegionLocator {
   }
 
   private void removeFromCache(HRegionLocation loc) {
-    TableCache tableCache = cache.get(loc.getRegionInfo().getTable());
+    TableCache tableCache = cache.get(loc.getRegion().getTable());
     if (tableCache == null) {
       return;
     }
-    tableCache.cache.computeIfPresent(loc.getRegionInfo().getStartKey(), (k, oldLoc) -> {
+    tableCache.cache.computeIfPresent(loc.getRegion().getStartKey(), (k, oldLoc) -> {
       if (oldLoc.getSeqNum() > loc.getSeqNum() ||
-          !oldLoc.getServerName().equals(loc.getServerName())) {
+        !oldLoc.getServerName().equals(loc.getServerName())) {
         return oldLoc;
       }
       return null;
@@ -194,16 +194,16 @@ class AsyncNonMetaRegionLocator {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Try adding " + loc + " to cache");
     }
-    byte[] startKey = loc.getRegionInfo().getStartKey();
+    byte[] startKey = loc.getRegion().getStartKey();
     HRegionLocation oldLoc = tableCache.cache.putIfAbsent(startKey, loc);
     if (oldLoc == null) {
       return true;
     }
     if (oldLoc.getSeqNum() > loc.getSeqNum() ||
-        oldLoc.getServerName().equals(loc.getServerName())) {
+      oldLoc.getServerName().equals(loc.getServerName())) {
       if (LOG.isTraceEnabled()) {
         LOG.trace("Will not add " + loc + " to cache because the old value " + oldLoc +
-            " is newer than us or has the same server name");
+          " is newer than us or has the same server name");
       }
       return false;
     }
@@ -213,8 +213,8 @@ class AsyncNonMetaRegionLocator {
       }
       if (LOG.isTraceEnabled()) {
         LOG.trace("Will not add " + loc + " to cache because the old value " + oldValue +
-            " is newer than us or has the same server name." +
-            " Maybe it is updated before we replace it");
+          " is newer than us or has the same server name." +
+          " Maybe it is updated before we replace it");
       }
       return oldValue;
     });
@@ -223,7 +223,7 @@ class AsyncNonMetaRegionLocator {
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
       justification = "Called by lambda expression")
   private void addToCache(HRegionLocation loc) {
-    addToCache(getTableCache(loc.getRegionInfo().getTable()), loc);
+    addToCache(getTableCache(loc.getRegion().getTable()), loc);
     if (LOG.isTraceEnabled()) {
       LOG.trace("Try adding " + loc + " to cache");
     }
@@ -232,9 +232,8 @@ class AsyncNonMetaRegionLocator {
   private void complete(TableName tableName, LocateRequest req, HRegionLocation loc,
       Throwable error) {
     if (error != null) {
-      LOG.warn(
-        "Failed to locate region in '" + tableName + "', row='" + Bytes.toStringBinary(req.row)
-            + "', locateType=" + req.locateType, error);
+      LOG.warn("Failed to locate region in '" + tableName + "', row='" +
+        Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType, error);
     }
     Optional<LocateRequest> toSend = Optional.empty();
     TableCache tableCache = getTableCache(tableName);
@@ -283,7 +282,7 @@ class AsyncNonMetaRegionLocator {
     RegionLocations locs = MetaTableAccessor.getRegionLocations(results.get(0));
     if (LOG.isDebugEnabled()) {
       LOG.debug("The fetched location of '" + tableName + "', row='" +
-          Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType + " is " + locs);
+        Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType + " is " + locs);
     }
     if (locs == null || locs.getDefaultRegionLocation() == null) {
       complete(tableName, req, null,
@@ -292,7 +291,7 @@ class AsyncNonMetaRegionLocator {
       return;
     }
     HRegionLocation loc = locs.getDefaultRegionLocation();
-    HRegionInfo info = loc.getRegionInfo();
+    RegionInfo info = loc.getRegion();
     if (info == null) {
       complete(tableName, req, null,
         new IOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s",
@@ -308,12 +307,12 @@ class AsyncNonMetaRegionLocator {
       complete(tableName, req, null,
         new RegionOfflineException(
             "the only available region for the required row is a split parent," +
-                " the daughters should be online soon: '" + info.getRegionNameAsString() + "'"));
+              " the daughters should be online soon: '" + info.getRegionNameAsString() + "'"));
       return;
     }
     if (info.isOffline()) {
       complete(tableName, req, null, new RegionOfflineException("the region is offline, could" +
-          " be caused by a disable table call: '" + info.getRegionNameAsString() + "'"));
+        " be caused by a disable table call: '" + info.getRegionNameAsString() + "'"));
       return;
     }
     if (loc.getServerName() == null) {
@@ -332,11 +331,11 @@ class AsyncNonMetaRegionLocator {
       return null;
     }
     HRegionLocation loc = entry.getValue();
-    byte[] endKey = loc.getRegionInfo().getEndKey();
+    byte[] endKey = loc.getRegion().getEndKey();
     if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
       if (LOG.isTraceEnabled()) {
         LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" +
-            Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.CURRENT);
+          Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.CURRENT);
       }
       return loc;
     } else {
@@ -347,16 +346,16 @@ class AsyncNonMetaRegionLocator {
   private HRegionLocation locateRowBeforeInCache(TableCache tableCache, TableName tableName,
       byte[] row) {
     Map.Entry<byte[], HRegionLocation> entry =
-        isEmptyStopRow(row) ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row);
+      isEmptyStopRow(row) ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row);
     if (entry == null) {
       return null;
     }
     HRegionLocation loc = entry.getValue();
-    if (isEmptyStopRow(loc.getRegionInfo().getEndKey()) ||
-        Bytes.compareTo(loc.getRegionInfo().getEndKey(), row) >= 0) {
+    if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
+      Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0) {
       if (LOG.isTraceEnabled()) {
         LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" +
-            Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.BEFORE);
+          Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.BEFORE);
       }
       return loc;
     } else {
@@ -367,7 +366,7 @@ class AsyncNonMetaRegionLocator {
   private void locateInMeta(TableName tableName, LocateRequest req) {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row) +
-          "', locateType=" + req.locateType + " in meta");
+        "', locateType=" + req.locateType + " in meta");
     }
     byte[] metaKey;
     if (req.locateType.equals(RegionLocateType.BEFORE)) {
@@ -380,7 +379,7 @@ class AsyncNonMetaRegionLocator {
     } else {
       metaKey = createRegionName(tableName, req.row, NINES, false);
     }
-    conn.getRawTable(META_TABLE_NAME)
+    conn.getTable(META_TABLE_NAME)
         .scanAll(new Scan().withStartRow(metaKey).setReversed(true).addFamily(CATALOG_FAMILY)
             .setOneRowLimit())
         .whenComplete((results, error) -> onScanComplete(tableName, req, results, error));
@@ -389,8 +388,8 @@ class AsyncNonMetaRegionLocator {
   private HRegionLocation locateInCache(TableCache tableCache, TableName tableName, byte[] row,
       RegionLocateType locateType) {
     return locateType.equals(RegionLocateType.BEFORE)
-        ? locateRowBeforeInCache(tableCache, tableName, row)
-        : locateRowInCache(tableCache, tableName, row);
+      ? locateRowBeforeInCache(tableCache, tableName, row)
+      : locateRowInCache(tableCache, tableName, row);
   }
 
   // locateToPrevious is true means we will use the start key of a region to locate the region
@@ -451,11 +450,11 @@ class AsyncNonMetaRegionLocator {
 
   void updateCachedLocation(HRegionLocation loc, Throwable exception) {
     AsyncRegionLocator.updateCachedLocation(loc, exception, l -> {
-      TableCache tableCache = cache.get(l.getRegionInfo().getTable());
+      TableCache tableCache = cache.get(l.getRegion().getTable());
       if (tableCache == null) {
         return null;
       }
-      return tableCache.cache.get(l.getRegionInfo().getStartKey());
+      return tableCache.cache.get(l.getRegion().getStartKey());
     }, this::addToCache, this::removeFromCache);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index 9c45883..5eceb2d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -153,7 +153,7 @@ class AsyncRpcRetryingCallerFactory {
 
     private ScanResultCache resultCache;
 
-    private RawScanResultConsumer consumer;
+    private AdvancedScanResultConsumer consumer;
 
     private ClientService.Interface stub;
 
@@ -192,7 +192,7 @@ class AsyncRpcRetryingCallerFactory {
       return this;
     }
 
-    public ScanSingleRegionCallerBuilder consumer(RawScanResultConsumer consumer) {
+    public ScanSingleRegionCallerBuilder consumer(AdvancedScanResultConsumer consumer) {
       this.consumer = consumer;
       return this;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index ec21275..51c243a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -28,14 +28,8 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
-
-import org.apache.hadoop.hbase.shaded.io.netty.util.HashedWheelTimer;
-import org.apache.hadoop.hbase.shaded.io.netty.util.Timeout;
-
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -47,13 +41,18 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.UnknownScannerException;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.client.RawScanResultConsumer.ScanResumer;
+import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanResumer;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
+import org.apache.hadoop.hbase.shaded.io.netty.util.HashedWheelTimer;
+import org.apache.hadoop.hbase.shaded.io.netty.util.Timeout;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
@@ -61,7 +60,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientServ
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 /**
  * Retry caller for scanning a region.
@@ -84,7 +82,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
 
   private final ScanResultCache resultCache;
 
-  private final RawScanResultConsumer consumer;
+  private final AdvancedScanResultConsumer consumer;
 
   private final ClientService.Interface stub;
 
@@ -143,7 +141,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
   // Notice that, the public methods of this class is supposed to be called by upper layer only, and
   // package private methods can only be called within the implementation of
   // AsyncScanSingleRegionRpcRetryingCaller.
-  private final class ScanControllerImpl implements RawScanResultConsumer.ScanController {
+  private final class ScanControllerImpl implements AdvancedScanResultConsumer.ScanController {
 
     // Make sure the methods are only called in this thread.
     private final Thread callerThread;
@@ -217,7 +215,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
   // Notice that, the public methods of this class is supposed to be called by upper layer only, and
   // package private methods can only be called within the implementation of
   // AsyncScanSingleRegionRpcRetryingCaller.
-  private final class ScanResumerImpl implements RawScanResultConsumer.ScanResumer {
+  private final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer {
 
     // INITIALIZED -> SUSPENDED -> RESUMED
     // INITIALIZED -> RESUMED
@@ -301,7 +299,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
 
   public AsyncScanSingleRegionRpcRetryingCaller(HashedWheelTimer retryTimer,
       AsyncConnectionImpl conn, Scan scan, ScanMetrics scanMetrics, long scannerId,
-      ScanResultCache resultCache, RawScanResultConsumer consumer, Interface stub,
+      ScanResultCache resultCache, AdvancedScanResultConsumer consumer, Interface stub,
       HRegionLocation loc, boolean isRegionServerRemote, long scannerLeaseTimeoutPeriodNs,
       long pauseNs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
     this.retryTimer = retryTimer;
@@ -344,8 +342,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     stub.scan(controller, req, resp -> {
       if (controller.failed()) {
         LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId +
-            " for " + loc.getRegionInfo().getEncodedName() + " of " +
-            loc.getRegionInfo().getTable() + " failed, ignore, probably already closed",
+            " for " + loc.getRegion().getEncodedName() + " of " +
+            loc.getRegion().getTable() + " failed, ignore, probably already closed",
           controller.getFailed());
       }
     });
@@ -384,7 +382,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     error = translateException(error);
     if (tries > startLogErrorsCnt) {
       LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for " +
-          loc.getRegionInfo().getEncodedName() + " of " + loc.getRegionInfo().getTable() +
+          loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable() +
           " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " +
           TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs() +
           " ms",
@@ -433,18 +431,18 @@ class AsyncScanSingleRegionRpcRetryingCaller {
   }
 
   private void completeWhenNoMoreResultsInRegion() {
-    if (noMoreResultsForScan(scan, loc.getRegionInfo())) {
+    if (noMoreResultsForScan(scan, loc.getRegion())) {
       completeNoMoreResults();
     } else {
-      completeWithNextStartRow(loc.getRegionInfo().getEndKey(), true);
+      completeWithNextStartRow(loc.getRegion().getEndKey(), true);
     }
   }
 
   private void completeReversedWhenNoMoreResultsInRegion() {
-    if (noMoreResultsForReverseScan(scan, loc.getRegionInfo())) {
+    if (noMoreResultsForReverseScan(scan, loc.getRegion())) {
       completeNoMoreResults();
     } else {
-      completeWithNextStartRow(loc.getRegionInfo().getStartKey(), false);
+      completeWithNextStartRow(loc.getRegion().getStartKey(), false);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
index d448e5a..ddedc3b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
@@ -68,7 +68,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
     } catch (IOException e) {
       onError(e,
         () -> "Get async stub to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row)
-            + "' in " + loc.getRegionInfo().getEncodedName() + " of " + tableName + " failed",
+            + "' in " + loc.getRegion().getEncodedName() + " of " + tableName + " failed",
         err -> conn.getLocator().updateCachedLocation(loc, err));
       return;
     }
@@ -78,7 +78,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
         if (error != null) {
           onError(error,
             () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in "
-                + loc.getRegionInfo().getEncodedName() + " of " + tableName + " failed",
+                + loc.getRegion().getEncodedName() + " of " + tableName + " failed",
             err -> conn.getLocator().updateCachedLocation(loc, err));
           return;
         }

http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
index 0c72c14..b3ccb15 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
@@ -17,20 +17,277 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.allOf;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly;
+
+import com.google.protobuf.RpcChannel;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CompareOperator;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
+
 /**
- * The asynchronous table for normal users.
+ * The interface for asynchronous version of Table. Obtain an instance from a
+ * {@link AsyncConnection}.
  * <p>
  * The implementation is required to be thread safe.
  * <p>
- * The implementation should make sure that user can do everything they want to the returned
- * {@code CompletableFuture} without breaking anything. Usually the implementation will require user
- * to provide a {@code ExecutorService}.
+ * Usually the implementation will not throw any exception directly. You need to get the exception
+ * from the returned {@link CompletableFuture}.
  * @since 2.0.0
  */
 @InterfaceAudience.Public
-public interface AsyncTable extends AsyncTableBase {
+public interface AsyncTable<C extends ScanResultConsumerBase> {
+
+  /**
+   * Gets the fully qualified table name instance of this table.
+   */
+  TableName getName();
+
+  /**
+   * Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance.
+   * <p>
+   * The reference returned is not a copy, so any change made to it will affect this instance.
+   */
+  Configuration getConfiguration();
+
+  /**
+   * Get timeout of each rpc request in this Table instance. It will be overridden by a more
+   * specific rpc timeout config such as readRpcTimeout or writeRpcTimeout.
+   * @see #getReadRpcTimeout(TimeUnit)
+   * @see #getWriteRpcTimeout(TimeUnit)
+   * @param unit the unit of time the timeout to be represented in
+   * @return rpc timeout in the specified time unit
+   */
+  long getRpcTimeout(TimeUnit unit);
+
+  /**
+   * Get timeout of each rpc read request in this Table instance.
+   * @param unit the unit of time the timeout to be represented in
+   * @return read rpc timeout in the specified time unit
+   */
+  long getReadRpcTimeout(TimeUnit unit);
+
+  /**
+   * Get timeout of each rpc write request in this Table instance.
+   * @param unit the unit of time the timeout to be represented in
+   * @return write rpc timeout in the specified time unit
+   */
+  long getWriteRpcTimeout(TimeUnit unit);
+
+  /**
+   * Get timeout of each operation in Table instance.
+   * @param unit the unit of time the timeout to be represented in
+   * @return operation rpc timeout in the specified time unit
+   */
+  long getOperationTimeout(TimeUnit unit);
+
+  /**
+   * Get the timeout of a single operation in a scan. It works like operation timeout for other
+   * operations.
+   * @param unit the unit of time the timeout to be represented in
+   * @return scan rpc timeout in the specified time unit
+   */
+  long getScanTimeout(TimeUnit unit);
+
+  /**
+   * Test for the existence of columns in the table, as specified by the Get.
+   * <p>
+   * This will return true if the Get matches one or more keys, false if not.
+   * <p>
+   * This is a server-side call so it prevents any data from being transfered to the client.
+   * @return true if the specified Get matches one or more keys, false if not. The return value will
+   *         be wrapped by a {@link CompletableFuture}.
+   */
+  default CompletableFuture<Boolean> exists(Get get) {
+    return get(toCheckExistenceOnly(get)).thenApply(r -> r.getExists());
+  }
+
+  /**
+   * Extracts certain cells from a given row.
+   * @param get The object that specifies what data to fetch and from which row.
+   * @return The data coming from the specified row, if it exists. If the row specified doesn't
+   *         exist, the {@link Result} instance returned won't contain any
+   *         {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. The
+   *         return value will be wrapped by a {@link CompletableFuture}.
+   */
+  CompletableFuture<Result> get(Get get);
+
+  /**
+   * Puts some data to the table.
+   * @param put The data to put.
+   * @return A {@link CompletableFuture} that always returns null when complete normally.
+   */
+  CompletableFuture<Void> put(Put put);
+
+  /**
+   * Deletes the specified cells/row.
+   * @param delete The object that specifies what to delete.
+   * @return A {@link CompletableFuture} that always returns null when complete normally.
+   */
+  CompletableFuture<Void> delete(Delete delete);
+
+  /**
+   * Appends values to one or more columns within a single row.
+   * <p>
+   * This operation does not appear atomic to readers. Appends are done under a single row lock, so
+   * write operations to a row are synchronized, but readers do not take row locks so get and scan
+   * operations can see this operation partially completed.
+   * @param append object that specifies the columns and amounts to be used for the increment
+   *          operations
+   * @return values of columns after the append operation (maybe null). The return value will be
+   *         wrapped by a {@link CompletableFuture}.
+   */
+  CompletableFuture<Result> append(Append append);
+
+  /**
+   * Increments one or more columns within a single row.
+   * <p>
+   * This operation does not appear atomic to readers. Increments are done under a single row lock,
+   * so write operations to a row are synchronized, but readers do not take row locks so get and
+   * scan operations can see this operation partially completed.
+   * @param increment object that specifies the columns and amounts to be used for the increment
+   *          operations
+   * @return values of columns after the increment. The return value will be wrapped by a
+   *         {@link CompletableFuture}.
+   */
+  CompletableFuture<Result> increment(Increment increment);
+
+  /**
+   * See {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)}
+   * <p>
+   * The {@link Durability} is defaulted to {@link Durability#SYNC_WAL}.
+   * @param row The row that contains the cell to increment.
+   * @param family The column family of the cell to increment.
+   * @param qualifier The column qualifier of the cell to increment.
+   * @param amount The amount to increment the cell with (or decrement, if the amount is negative).
+   * @return The new value, post increment. The return value will be wrapped by a
+   *         {@link CompletableFuture}.
+   */
+  default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
+      long amount) {
+    return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
+  }
+
+  /**
+   * Atomically increments a column value. If the column value already exists and is not a
+   * big-endian long, this could throw an exception. If the column value does not yet exist it is
+   * initialized to <code>amount</code> and written to the specified column.
+   * <p>
+   * Setting durability to {@link Durability#SKIP_WAL} means that in a fail scenario you will lose
+   * any increments that have not been flushed.
+   * @param row The row that contains the cell to increment.
+   * @param family The column family of the cell to increment.
+   * @param qualifier The column qualifier of the cell to increment.
+   * @param amount The amount to increment the cell with (or decrement, if the amount is negative).
+   * @param durability The persistence guarantee for this increment.
+   * @return The new value, post increment. The return value will be wrapped by a
+   *         {@link CompletableFuture}.
+   */
+  default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
+      long amount, Durability durability) {
+    Preconditions.checkNotNull(row, "row is null");
+    Preconditions.checkNotNull(family, "family is null");
+    return increment(
+      new Increment(row).addColumn(family, qualifier, amount).setDurability(durability))
+          .thenApply(r -> Bytes.toLong(r.getValue(family, qualifier)));
+  }
+
+  /**
+   * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
+   * adds the Put/Delete/RowMutations.
+   * <p>
+   * Use the returned {@link CheckAndMutateBuilder} to construct your request and then execute it.
+   * This is a fluent style API, the code is like:
+   *
+   * <pre>
+   * <code>
+   * table.checkAndMutate(row, family).qualifier(qualifier).ifNotExists().thenPut(put)
+   *     .thenAccept(succ -> {
+   *       if (succ) {
+   *         System.out.println("Check and put succeeded");
+   *       } else {
+   *         System.out.println("Check and put failed");
+   *       }
+   *     });
+   * </code>
+   * </pre>
+   */
+  CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family);
+
+  /**
+   * A helper class for sending checkAndMutate request.
+   */
+  interface CheckAndMutateBuilder {
+
+    /**
+     * @param qualifier column qualifier to check.
+     */
+    CheckAndMutateBuilder qualifier(byte[] qualifier);
+
+    /**
+     * Check for lack of column.
+     */
+    CheckAndMutateBuilder ifNotExists();
+
+    default CheckAndMutateBuilder ifEquals(byte[] value) {
+      return ifMatches(CompareOperator.EQUAL, value);
+    }
+
+    /**
+     * @param compareOp comparison operator to use
+     * @param value the expected value
+     */
+    CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value);
+
+    /**
+     * @param put data to put if check succeeds
+     * @return {@code true} if the new put was executed, {@code false} otherwise. The return value
+     *         will be wrapped by a {@link CompletableFuture}.
+     */
+    CompletableFuture<Boolean> thenPut(Put put);
+
+    /**
+     * @param delete data to delete if check succeeds
+     * @return {@code true} if the new delete was executed, {@code false} otherwise. The return
+     *         value will be wrapped by a {@link CompletableFuture}.
+     */
+    CompletableFuture<Boolean> thenDelete(Delete delete);
+
+    /**
+     * @param mutation mutations to perform if check succeeds
+     * @return true if the new mutation was executed, false otherwise. The return value will be
+     *         wrapped by a {@link CompletableFuture}.
+     */
+    CompletableFuture<Boolean> thenMutate(RowMutations mutation);
+  }
+
+  /**
+   * Performs multiple mutations atomically on a single row. Currently {@link Put} and
+   * {@link Delete} are supported.
+   * @param mutation object that specifies the set of mutations to perform atomically
+   * @return A {@link CompletableFuture} that always returns null when complete normally.
+   */
+  CompletableFuture<Void> mutateRow(RowMutations mutation);
+
+  /**
+   * The scan API uses the observer pattern.
+   * @param scan A configured {@link Scan} object.
+   * @param consumer the consumer used to receive results.
+   * @see ScanResultConsumer
+   * @see AdvancedScanResultConsumer
+   */
+  void scan(Scan scan, C consumer);
 
   /**
    * Gets a scanner on the current table for the given family.
@@ -59,13 +316,300 @@ public interface AsyncTable extends AsyncTableBase {
   ResultScanner getScanner(Scan scan);
 
   /**
-   * The scan API uses the observer pattern. All results that match the given scan object will be
-   * passed to the given {@code consumer} by calling {@link ScanResultConsumer#onNext(Result)}.
-   * {@link ScanResultConsumer#onComplete()} means the scan is finished, and
-   * {@link ScanResultConsumer#onError(Throwable)} means we hit an unrecoverable error and the scan
-   * is terminated.
-   * @param scan A configured {@link Scan} object.
-   * @param consumer the consumer used to receive results.
+   * Return all the results that match the given scan object.
+   * <p>
+   * Notice that usually you should use this method with a {@link Scan} object that has limit set.
+   * For example, if you want to get the closest row after a given row, you could do this:
+   * <p>
+   *
+   * <pre>
+   * <code>
+   * table.scanAll(new Scan().withStartRow(row, false).setLimit(1)).thenAccept(results -> {
+   *   if (results.isEmpty()) {
+   *      System.out.println("No row after " + Bytes.toStringBinary(row));
+   *   } else {
+   *     System.out.println("The closest row after " + Bytes.toStringBinary(row) + " is "
+   *         + Bytes.toStringBinary(results.stream().findFirst().get().getRow()));
+   *   }
+   * });
+   * </code>
+   * </pre>
+   * <p>
+   * If your result set is very large, you should use other scan method to get a scanner or use
+   * callback to process the results. They will do chunking to prevent OOM. The scanAll method will
+   * fetch all the results and store them in a List and then return the list to you.
+   * <p>
+   * The scan metrics will be collected background if you enable it but you have no way to get it.
+   * Usually you can get scan metrics from {@code ResultScanner}, or through
+   * {@code ScanResultConsumer.onScanMetricsCreated} but this method only returns a list of results.
+   * So if you really care about scan metrics then you'd better use other scan methods which return
+   * a {@code ResultScanner} or let you pass in a {@code ScanResultConsumer}. There is no
+   * performance difference between these scan methods so do not worry.
+   * @param scan A configured {@link Scan} object. So if you use this method to fetch a really large
+   *          result set, it is likely to cause OOM.
+   * @return The results of this small scan operation. The return value will be wrapped by a
+   *         {@link CompletableFuture}.
+   */
+  CompletableFuture<List<Result>> scanAll(Scan scan);
+
+  /**
+   * Test for the existence of columns in the table, as specified by the Gets.
+   * <p>
+   * This will return a list of booleans. Each value will be true if the related Get matches one or
+   * more keys, false if not.
+   * <p>
+   * This is a server-side call so it prevents any data from being transferred to the client.
+   * @param gets the Gets
+   * @return A list of {@link CompletableFuture}s that represent the existence for each get.
+   */
+  default List<CompletableFuture<Boolean>> exists(List<Get> gets) {
+    return get(toCheckExistenceOnly(gets)).stream()
+        .<CompletableFuture<Boolean>> map(f -> f.thenApply(r -> r.getExists())).collect(toList());
+  }
+
+  /**
+   * A simple version for batch exists. It will fail if there are any failures and you will get the
+   * whole result boolean list at once if the operation is succeeded.
+   * @param gets the Gets
+   * @return A {@link CompletableFuture} that wrapper the result boolean list.
+   */
+  default CompletableFuture<List<Boolean>> existsAll(List<Get> gets) {
+    return allOf(exists(gets));
+  }
+
+  /**
+   * Extracts certain cells from the given rows, in batch.
+   * <p>
+   * Notice that you may not get all the results with this function, which means some of the
+   * returned {@link CompletableFuture}s may succeed while some of the other returned
+   * {@link CompletableFuture}s may fail.
+   * @param gets The objects that specify what data to fetch and from which rows.
+   * @return A list of {@link CompletableFuture}s that represent the result for each get.
+   */
+  List<CompletableFuture<Result>> get(List<Get> gets);
+
+  /**
+   * A simple version for batch get. It will fail if there are any failures and you will get the
+   * whole result list at once if the operation is succeeded.
+   * @param gets The objects that specify what data to fetch and from which rows.
+   * @return A {@link CompletableFuture} that wrapper the result list.
+   */
+  default CompletableFuture<List<Result>> getAll(List<Get> gets) {
+    return allOf(get(gets));
+  }
+
+  /**
+   * Puts some data in the table, in batch.
+   * @param puts The list of mutations to apply.
+   * @return A list of {@link CompletableFuture}s that represent the result for each put.
+   */
+  List<CompletableFuture<Void>> put(List<Put> puts);
+
+  /**
+   * A simple version of batch put. It will fail if there are any failures.
+   * @param puts The list of mutations to apply.
+   * @return A {@link CompletableFuture} that always returns null when complete normally.
+   */
+  default CompletableFuture<Void> putAll(List<Put> puts) {
+    return allOf(put(puts)).thenApply(r -> null);
+  }
+
+  /**
+   * Deletes the specified cells/rows in bulk.
+   * @param deletes list of things to delete.
+   * @return A list of {@link CompletableFuture}s that represent the result for each delete.
+   */
+  List<CompletableFuture<Void>> delete(List<Delete> deletes);
+
+  /**
+   * A simple version of batch delete. It will fail if there are any failures.
+   * @param deletes list of things to delete.
+   * @return A {@link CompletableFuture} that always returns null when complete normally.
+   */
+  default CompletableFuture<Void> deleteAll(List<Delete> deletes) {
+    return allOf(delete(deletes)).thenApply(r -> null);
+  }
+
+  /**
+   * Method that does a batch call on Deletes, Gets, Puts, Increments and Appends. The ordering of
+   * execution of the actions is not defined. Meaning if you do a Put and a Get in the same
+   * {@link #batch} call, you will not necessarily be guaranteed that the Get returns what the Put
+   * had put.
+   * @param actions list of Get, Put, Delete, Increment, Append objects
+   * @return A list of {@link CompletableFuture}s that represent the result for each action.
+   */
+  <T> List<CompletableFuture<T>> batch(List<? extends Row> actions);
+
+  /**
+   * A simple version of batch. It will fail if there are any failures and you will get the whole
+   * result list at once if the operation is succeeded.
+   * @param actions list of Get, Put, Delete, Increment, Append objects
+   * @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}.
+   */
+  default <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) {
+    return allOf(batch(actions));
+  }
+
+  /**
+   * Execute the given coprocessor call on the region which contains the given {@code row}.
+   * <p>
+   * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a
+   * one line lambda expression, like:
+   *
+   * <pre>
+   * <code>
+   * channel -> xxxService.newStub(channel)
+   * </code>
+   * </pre>
+   *
+   * @param stubMaker a delegation to the actual {@code newStub} call.
+   * @param callable a delegation to the actual protobuf rpc call. See the comment of
+   *          {@link ServiceCaller} for more details.
+   * @param row The row key used to identify the remote region location
+   * @param <S> the type of the asynchronous stub
+   * @param <R> the type of the return value
+   * @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}.
+   * @see ServiceCaller
+   */
+  <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
+      ServiceCaller<S, R> callable, byte[] row);
+
+  /**
+   * The callback when we want to execute a coprocessor call on a range of regions.
+   * <p>
+   * As the locating itself also takes some time, the implementation may want to send rpc calls on
+   * the fly, which means we do not know how many regions we have when we get the return value of
+   * the rpc calls, so we need an {@link #onComplete()} which is used to tell you that we have
+   * passed all the return values to you(through the {@link #onRegionComplete(RegionInfo, Object)}
+   * or {@link #onRegionError(RegionInfo, Throwable)} calls), i.e, there will be no
+   * {@link #onRegionComplete(RegionInfo, Object)} or {@link #onRegionError(RegionInfo, Throwable)}
+   * calls in the future.
+   * <p>
+   * Here is a pseudo code to describe a typical implementation of a range coprocessor service
+   * method to help you better understand how the {@link CoprocessorCallback} will be called. The
+   * {@code callback} in the pseudo code is our {@link CoprocessorCallback}. And notice that the
+   * {@code whenComplete} is {@code CompletableFuture.whenComplete}.
+   *
+   * <pre>
+   * locateThenCall(byte[] row) {
+   *   locate(row).whenComplete((location, locateError) -> {
+   *     if (locateError != null) {
+   *       callback.onError(locateError);
+   *       return;
+   *     }
+   *     incPendingCall();
+   *     region = location.getRegion();
+   *     if (region.getEndKey() > endKey) {
+   *       locateEnd = true;
+   *     } else {
+   *       locateThenCall(region.getEndKey());
+   *     }
+   *     sendCall().whenComplete((resp, error) -> {
+   *       if (error != null) {
+   *         callback.onRegionError(region, error);
+   *       } else {
+   *         callback.onRegionComplete(region, resp);
+   *       }
+   *       if (locateEnd && decPendingCallAndGet() == 0) {
+   *         callback.onComplete();
+   *       }
+   *     });
+   *   });
+   * }
+   * </pre>
+   */
+  @InterfaceAudience.Public
+  interface CoprocessorCallback<R> {
+
+    /**
+     * @param region the region that the response belongs to
+     * @param resp the response of the coprocessor call
+     */
+    void onRegionComplete(RegionInfo region, R resp);
+
+    /**
+     * @param region the region that the error belongs to
+     * @param error the response error of the coprocessor call
+     */
+    void onRegionError(RegionInfo region, Throwable error);
+
+    /**
+     * Indicate that all responses of the regions have been notified by calling
+     * {@link #onRegionComplete(RegionInfo, Object)} or
+     * {@link #onRegionError(RegionInfo, Throwable)}.
+     */
+    void onComplete();
+
+    /**
+     * Indicate that we got an error which does not belong to any regions. Usually a locating error.
+     */
+    void onError(Throwable error);
+  }
+
+  /**
+   * Helper class for sending coprocessorService request that executes a coprocessor call on regions
+   * which are covered by a range.
+   * <p>
+   * If {@code fromRow} is not specified the selection will start with the first table region. If
+   * {@code toRow} is not specified the selection will continue through the last table region.
+   * @param <S> the type of the protobuf Service you want to call.
+   * @param <R> the type of the return value.
+   */
+  interface CoprocessorServiceBuilder<S, R> {
+
+    /**
+     * @param startKey start region selection with region containing this row, inclusive.
+     */
+    default CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey) {
+      return fromRow(startKey, true);
+    }
+
+    /**
+     * @param startKey start region selection with region containing this row
+     * @param inclusive whether to include the startKey
+     */
+    CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive);
+
+    /**
+     * @param endKey select regions up to and including the region containing this row, exclusive.
+     */
+    default CoprocessorServiceBuilder<S, R> toRow(byte[] endKey) {
+      return toRow(endKey, false);
+    }
+
+    /**
+     * @param endKey select regions up to and including the region containing this row
+     * @param inclusive whether to include the endKey
+     */
+    CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive);
+
+    /**
+     * Execute the coprocessorService request. You can get the response through the
+     * {@link CoprocessorCallback}.
+     */
+    void execute();
+  }
+
+  /**
+   * Execute a coprocessor call on the regions which are covered by a range.
+   * <p>
+   * Use the returned {@link CoprocessorServiceBuilder} construct your request and then execute it.
+   * <p>
+   * The {@code stubMaker} is just a delegation to the {@code xxxService.newStub} call. Usually it
+   * is only a one line lambda expression, like:
+   *
+   * <pre>
+   * <code>
+   * channel -> xxxService.newStub(channel)
+   * </code>
+   * </pre>
+   *
+   * @param stubMaker a delegation to the actual {@code newStub} call.
+   * @param callable a delegation to the actual protobuf rpc call. See the comment of
+   *          {@link ServiceCaller} for more details.
+   * @param callback callback to get the response. See the comment of {@link CoprocessorCallback}
+   *          for more details.
    */
-  void scan(Scan scan, ScanResultConsumer consumer);
+  <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(Function<RpcChannel, S> stubMaker,
+      ServiceCaller<S, R> callable, CoprocessorCallback<R> callback);
 }