You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/11/16 06:37:07 UTC

[2/3] hbase git commit: HBASE-19251 Merge RawAsyncTable and AsyncTable

http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
deleted file mode 100644
index 7d24c4f..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
+++ /dev/null
@@ -1,414 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static java.util.stream.Collectors.toList;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.allOf;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly;
-
-import org.apache.hadoop.hbase.CompareOperator;
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
-
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * The base interface for asynchronous version of Table. Obtain an instance from a
- * {@link AsyncConnection}.
- * <p>
- * The implementation is required to be thread safe.
- * <p>
- * Usually the implementation will not throw any exception directly. You need to get the exception
- * from the returned {@link CompletableFuture}.
- * @since 2.0.0
- */
-@InterfaceAudience.Public
-public interface AsyncTableBase {
-
-  /**
-   * Gets the fully qualified table name instance of this table.
-   */
-  TableName getName();
-
-  /**
-   * Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance.
-   * <p>
-   * The reference returned is not a copy, so any change made to it will affect this instance.
-   */
-  Configuration getConfiguration();
-
-  /**
-   * Get timeout of each rpc request in this Table instance. It will be overridden by a more
-   * specific rpc timeout config such as readRpcTimeout or writeRpcTimeout.
-   * @see #getReadRpcTimeout(TimeUnit)
-   * @see #getWriteRpcTimeout(TimeUnit)
-   * @param unit the unit of time the timeout to be represented in
-   * @return rpc timeout in the specified time unit
-   */
-  long getRpcTimeout(TimeUnit unit);
-
-  /**
-   * Get timeout of each rpc read request in this Table instance.
-   * @param unit the unit of time the timeout to be represented in
-   * @return read rpc timeout in the specified time unit
-   */
-  long getReadRpcTimeout(TimeUnit unit);
-
-  /**
-   * Get timeout of each rpc write request in this Table instance.
-   * @param unit the unit of time the timeout to be represented in
-   * @return write rpc timeout in the specified time unit
-   */
-  long getWriteRpcTimeout(TimeUnit unit);
-
-  /**
-   * Get timeout of each operation in Table instance.
-   * @param unit the unit of time the timeout to be represented in
-   * @return operation rpc timeout in the specified time unit
-   */
-  long getOperationTimeout(TimeUnit unit);
-
-  /**
-   * Get the timeout of a single operation in a scan. It works like operation timeout for other
-   * operations.
-   * @param unit the unit of time the timeout to be represented in
-   * @return scan rpc timeout in the specified time unit
-   */
-  long getScanTimeout(TimeUnit unit);
-
-  /**
-   * Test for the existence of columns in the table, as specified by the Get.
-   * <p>
-   * This will return true if the Get matches one or more keys, false if not.
-   * <p>
-   * This is a server-side call so it prevents any data from being transfered to the client.
-   * @return true if the specified Get matches one or more keys, false if not. The return value will
-   *         be wrapped by a {@link CompletableFuture}.
-   */
-  default CompletableFuture<Boolean> exists(Get get) {
-    return get(toCheckExistenceOnly(get)).thenApply(r -> r.getExists());
-  }
-
-  /**
-   * Extracts certain cells from a given row.
-   * @param get The object that specifies what data to fetch and from which row.
-   * @return The data coming from the specified row, if it exists. If the row specified doesn't
-   *         exist, the {@link Result} instance returned won't contain any
-   *         {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. The
-   *         return value will be wrapped by a {@link CompletableFuture}.
-   */
-  CompletableFuture<Result> get(Get get);
-
-  /**
-   * Puts some data to the table.
-   * @param put The data to put.
-   * @return A {@link CompletableFuture} that always returns null when complete normally.
-   */
-  CompletableFuture<Void> put(Put put);
-
-  /**
-   * Deletes the specified cells/row.
-   * @param delete The object that specifies what to delete.
-   * @return A {@link CompletableFuture} that always returns null when complete normally.
-   */
-  CompletableFuture<Void> delete(Delete delete);
-
-  /**
-   * Appends values to one or more columns within a single row.
-   * <p>
-   * This operation does not appear atomic to readers. Appends are done under a single row lock, so
-   * write operations to a row are synchronized, but readers do not take row locks so get and scan
-   * operations can see this operation partially completed.
-   * @param append object that specifies the columns and amounts to be used for the increment
-   *          operations
-   * @return values of columns after the append operation (maybe null). The return value will be
-   *         wrapped by a {@link CompletableFuture}.
-   */
-  CompletableFuture<Result> append(Append append);
-
-  /**
-   * Increments one or more columns within a single row.
-   * <p>
-   * This operation does not appear atomic to readers. Increments are done under a single row lock,
-   * so write operations to a row are synchronized, but readers do not take row locks so get and
-   * scan operations can see this operation partially completed.
-   * @param increment object that specifies the columns and amounts to be used for the increment
-   *          operations
-   * @return values of columns after the increment. The return value will be wrapped by a
-   *         {@link CompletableFuture}.
-   */
-  CompletableFuture<Result> increment(Increment increment);
-
-  /**
-   * See {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)}
-   * <p>
-   * The {@link Durability} is defaulted to {@link Durability#SYNC_WAL}.
-   * @param row The row that contains the cell to increment.
-   * @param family The column family of the cell to increment.
-   * @param qualifier The column qualifier of the cell to increment.
-   * @param amount The amount to increment the cell with (or decrement, if the amount is negative).
-   * @return The new value, post increment. The return value will be wrapped by a
-   *         {@link CompletableFuture}.
-   */
-  default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
-      long amount) {
-    return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
-  }
-
-  /**
-   * Atomically increments a column value. If the column value already exists and is not a
-   * big-endian long, this could throw an exception. If the column value does not yet exist it is
-   * initialized to <code>amount</code> and written to the specified column.
-   * <p>
-   * Setting durability to {@link Durability#SKIP_WAL} means that in a fail scenario you will lose
-   * any increments that have not been flushed.
-   * @param row The row that contains the cell to increment.
-   * @param family The column family of the cell to increment.
-   * @param qualifier The column qualifier of the cell to increment.
-   * @param amount The amount to increment the cell with (or decrement, if the amount is negative).
-   * @param durability The persistence guarantee for this increment.
-   * @return The new value, post increment. The return value will be wrapped by a
-   *         {@link CompletableFuture}.
-   */
-  default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
-      long amount, Durability durability) {
-    Preconditions.checkNotNull(row, "row is null");
-    Preconditions.checkNotNull(family, "family is null");
-    return increment(
-      new Increment(row).addColumn(family, qualifier, amount).setDurability(durability))
-          .thenApply(r -> Bytes.toLong(r.getValue(family, qualifier)));
-  }
-
-  /**
-   * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
-   * adds the Put/Delete/RowMutations.
-   * <p>
-   * Use the returned {@link CheckAndMutateBuilder} to construct your request and then execute it.
-   * This is a fluent style API, the code is like:
-   *
-   * <pre>
-   * <code>
-   * table.checkAndMutate(row, family).qualifier(qualifier).ifNotExists().thenPut(put)
-   *     .thenAccept(succ -> {
-   *       if (succ) {
-   *         System.out.println("Check and put succeeded");
-   *       } else {
-   *         System.out.println("Check and put failed");
-   *       }
-   *     });
-   * </code>
-   * </pre>
-   */
-  CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family);
-
-  /**
-   * A helper class for sending checkAndMutate request.
-   */
-  interface CheckAndMutateBuilder {
-
-    /**
-     * @param qualifier column qualifier to check.
-     */
-    CheckAndMutateBuilder qualifier(byte[] qualifier);
-
-    /**
-     * Check for lack of column.
-     */
-    CheckAndMutateBuilder ifNotExists();
-
-    default CheckAndMutateBuilder ifEquals(byte[] value) {
-      return ifMatches(CompareOperator.EQUAL, value);
-    }
-
-    /**
-     * @param compareOp comparison operator to use
-     * @param value the expected value
-     */
-    CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value);
-
-    /**
-     * @param put data to put if check succeeds
-     * @return {@code true} if the new put was executed, {@code false} otherwise. The return value
-     *         will be wrapped by a {@link CompletableFuture}.
-     */
-    CompletableFuture<Boolean> thenPut(Put put);
-
-    /**
-     * @param delete data to delete if check succeeds
-     * @return {@code true} if the new delete was executed, {@code false} otherwise. The return
-     *         value will be wrapped by a {@link CompletableFuture}.
-     */
-    CompletableFuture<Boolean> thenDelete(Delete delete);
-
-    /**
-     * @param mutation mutations to perform if check succeeds
-     * @return true if the new mutation was executed, false otherwise. The return value will be
-     *         wrapped by a {@link CompletableFuture}.
-     */
-    CompletableFuture<Boolean> thenMutate(RowMutations mutation);
-  }
-
-  /**
-   * Performs multiple mutations atomically on a single row. Currently {@link Put} and
-   * {@link Delete} are supported.
-   * @param mutation object that specifies the set of mutations to perform atomically
-   * @return A {@link CompletableFuture} that always returns null when complete normally.
-   */
-  CompletableFuture<Void> mutateRow(RowMutations mutation);
-
-  /**
-   * Return all the results that match the given scan object.
-   * <p>
-   * Notice that usually you should use this method with a {@link Scan} object that has limit set.
-   * For example, if you want to get the closest row after a given row, you could do this:
-   * <p>
-   *
-   * <pre>
-   * <code>
-   * table.scanAll(new Scan().withStartRow(row, false).setLimit(1)).thenAccept(results -> {
-   *   if (results.isEmpty()) {
-   *      System.out.println("No row after " + Bytes.toStringBinary(row));
-   *   } else {
-   *     System.out.println("The closest row after " + Bytes.toStringBinary(row) + " is "
-   *         + Bytes.toStringBinary(results.stream().findFirst().get().getRow()));
-   *   }
-   * });
-   * </code>
-   * </pre>
-   * <p>
-   * If your result set is very large, you should use other scan method to get a scanner or use
-   * callback to process the results. They will do chunking to prevent OOM. The scanAll method will
-   * fetch all the results and store them in a List and then return the list to you.
-   * <p>
-   * The scan metrics will be collected background if you enable it but you have no way to get it.
-   * Usually you can get scan metrics from {@code ResultScanner}, or through
-   * {@code ScanResultConsumer.onScanMetricsCreated} but this method only returns a list of results.
-   * So if you really care about scan metrics then you'd better use other scan methods which return
-   * a {@code ResultScanner} or let you pass in a {@code ScanResultConsumer}. There is no
-   * performance difference between these scan methods so do not worry.
-   * @param scan A configured {@link Scan} object. So if you use this method to fetch a really large
-   *          result set, it is likely to cause OOM.
-   * @return The results of this small scan operation. The return value will be wrapped by a
-   *         {@link CompletableFuture}.
-   */
-  CompletableFuture<List<Result>> scanAll(Scan scan);
-
-  /**
-   * Test for the existence of columns in the table, as specified by the Gets.
-   * <p>
-   * This will return a list of booleans. Each value will be true if the related Get matches one or
-   * more keys, false if not.
-   * <p>
-   * This is a server-side call so it prevents any data from being transferred to the client.
-   * @param gets the Gets
-   * @return A list of {@link CompletableFuture}s that represent the existence for each get.
-   */
-  default List<CompletableFuture<Boolean>> exists(List<Get> gets) {
-    return get(toCheckExistenceOnly(gets)).stream()
-        .<CompletableFuture<Boolean>> map(f -> f.thenApply(r -> r.getExists())).collect(toList());
-  }
-
-  /**
-   * A simple version for batch exists. It will fail if there are any failures and you will get the
-   * whole result boolean list at once if the operation is succeeded.
-   * @param gets the Gets
-   * @return A {@link CompletableFuture} that wrapper the result boolean list.
-   */
-  default CompletableFuture<List<Boolean>> existsAll(List<Get> gets) {
-    return allOf(exists(gets));
-  }
-
-  /**
-   * Extracts certain cells from the given rows, in batch.
-   * <p>
-   * Notice that you may not get all the results with this function, which means some of the
-   * returned {@link CompletableFuture}s may succeed while some of the other returned
-   * {@link CompletableFuture}s may fail.
-   * @param gets The objects that specify what data to fetch and from which rows.
-   * @return A list of {@link CompletableFuture}s that represent the result for each get.
-   */
-  List<CompletableFuture<Result>> get(List<Get> gets);
-
-  /**
-   * A simple version for batch get. It will fail if there are any failures and you will get the
-   * whole result list at once if the operation is succeeded.
-   * @param gets The objects that specify what data to fetch and from which rows.
-   * @return A {@link CompletableFuture} that wrapper the result list.
-   */
-  default CompletableFuture<List<Result>> getAll(List<Get> gets) {
-    return allOf(get(gets));
-  }
-
-  /**
-   * Puts some data in the table, in batch.
-   * @param puts The list of mutations to apply.
-   * @return A list of {@link CompletableFuture}s that represent the result for each put.
-   */
-  List<CompletableFuture<Void>> put(List<Put> puts);
-
-  /**
-   * A simple version of batch put. It will fail if there are any failures.
-   * @param puts The list of mutations to apply.
-   * @return A {@link CompletableFuture} that always returns null when complete normally.
-   */
-  default CompletableFuture<Void> putAll(List<Put> puts) {
-    return allOf(put(puts)).thenApply(r -> null);
-  }
-
-  /**
-   * Deletes the specified cells/rows in bulk.
-   * @param deletes list of things to delete.
-   * @return A list of {@link CompletableFuture}s that represent the result for each delete.
-   */
-  List<CompletableFuture<Void>> delete(List<Delete> deletes);
-
-  /**
-   * A simple version of batch delete. It will fail if there are any failures.
-   * @param deletes list of things to delete.
-   * @return A {@link CompletableFuture} that always returns null when complete normally.
-   */
-  default CompletableFuture<Void> deleteAll(List<Delete> deletes) {
-    return allOf(delete(deletes)).thenApply(r -> null);
-  }
-
-  /**
-   * Method that does a batch call on Deletes, Gets, Puts, Increments and Appends. The ordering of
-   * execution of the actions is not defined. Meaning if you do a Put and a Get in the same
-   * {@link #batch} call, you will not necessarily be guaranteed that the Get returns what the Put
-   * had put.
-   * @param actions list of Get, Put, Delete, Increment, Append objects
-   * @return A list of {@link CompletableFuture}s that represent the result for each action.
-   */
-  <T> List<CompletableFuture<T>> batch(List<? extends Row> actions);
-
-  /**
-   * A simple version of batch. It will fail if there are any failures and you will get the whole
-   * result list at once if the operation is succeeded.
-   * @param actions list of Get, Put, Delete, Increment, Append objects
-   * @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}.
-   */
-  default <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) {
-    return allOf(batch(actions));
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java
index 9c5b092..6632ad5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java
@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
- * For creating {@link AsyncTable} or {@link RawAsyncTable}.
+ * For creating {@link AsyncTable}.
  * <p>
  * The implementation should have default configurations set before returning the builder to user.
  * So users are free to only set the configs they care about to create a new
@@ -32,7 +32,7 @@ import org.apache.yetus.audience.InterfaceAudience;
  * @since 2.0.0
  */
 @InterfaceAudience.Public
-public interface AsyncTableBuilder<T extends AsyncTableBase> {
+public interface AsyncTableBuilder<C extends ScanResultConsumerBase> {
 
   /**
    * Set timeout for a whole operation such as get, put or delete. Notice that scan will not be
@@ -44,7 +44,7 @@ public interface AsyncTableBuilder<T extends AsyncTableBase> {
    * @see #setMaxRetries(int)
    * @see #setScanTimeout(long, TimeUnit)
    */
-  AsyncTableBuilder<T> setOperationTimeout(long timeout, TimeUnit unit);
+  AsyncTableBuilder<C> setOperationTimeout(long timeout, TimeUnit unit);
 
   /**
    * As now we have heartbeat support for scan, ideally a scan will never timeout unless the RS is
@@ -53,7 +53,7 @@ public interface AsyncTableBuilder<T extends AsyncTableBase> {
    * operation in a scan, such as openScanner or next.
    * @see #setScanTimeout(long, TimeUnit)
    */
-  AsyncTableBuilder<T> setScanTimeout(long timeout, TimeUnit unit);
+  AsyncTableBuilder<C> setScanTimeout(long timeout, TimeUnit unit);
 
   /**
    * Set timeout for each rpc request.
@@ -61,23 +61,23 @@ public interface AsyncTableBuilder<T extends AsyncTableBase> {
    * Notice that this will <strong>NOT</strong> change the rpc timeout for read(get, scan) request
    * and write request(put, delete).
    */
-  AsyncTableBuilder<T> setRpcTimeout(long timeout, TimeUnit unit);
+  AsyncTableBuilder<C> setRpcTimeout(long timeout, TimeUnit unit);
 
   /**
    * Set timeout for each read(get, scan) rpc request.
    */
-  AsyncTableBuilder<T> setReadRpcTimeout(long timeout, TimeUnit unit);
+  AsyncTableBuilder<C> setReadRpcTimeout(long timeout, TimeUnit unit);
 
   /**
    * Set timeout for each write(put, delete) rpc request.
    */
-  AsyncTableBuilder<T> setWriteRpcTimeout(long timeout, TimeUnit unit);
+  AsyncTableBuilder<C> setWriteRpcTimeout(long timeout, TimeUnit unit);
 
   /**
    * Set the base pause time for retrying. We use an exponential policy to generate sleep time when
    * retrying.
    */
-  AsyncTableBuilder<T> setRetryPause(long pause, TimeUnit unit);
+  AsyncTableBuilder<C> setRetryPause(long pause, TimeUnit unit);
 
   /**
    * Set the max retry times for an operation. Usually it is the max attempt times minus 1.
@@ -87,7 +87,7 @@ public interface AsyncTableBuilder<T extends AsyncTableBase> {
    * @see #setMaxAttempts(int)
    * @see #setOperationTimeout(long, TimeUnit)
    */
-  default AsyncTableBuilder<T> setMaxRetries(int maxRetries) {
+  default AsyncTableBuilder<C> setMaxRetries(int maxRetries) {
     return setMaxAttempts(retries2Attempts(maxRetries));
   }
 
@@ -98,15 +98,15 @@ public interface AsyncTableBuilder<T extends AsyncTableBase> {
    * @see #setMaxRetries(int)
    * @see #setOperationTimeout(long, TimeUnit)
    */
-  AsyncTableBuilder<T> setMaxAttempts(int maxAttempts);
+  AsyncTableBuilder<C> setMaxAttempts(int maxAttempts);
 
   /**
    * Set the number of retries that are allowed before we start to log.
    */
-  AsyncTableBuilder<T> setStartLogErrorsCnt(int startLogErrorsCnt);
+  AsyncTableBuilder<C> setStartLogErrorsCnt(int startLogErrorsCnt);
 
   /**
-   * Create the {@link AsyncTable} or {@link RawAsyncTable} instance.
+   * Create the {@link AsyncTable} instance.
    */
-  T build();
+  AsyncTable<C> build();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
index 3fd6bde..ee571f1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
@@ -28,7 +28,8 @@ import org.apache.yetus.audience.InterfaceAudience;
  * Base class for all asynchronous table builders.
  */
 @InterfaceAudience.Private
-abstract class AsyncTableBuilderBase<T extends AsyncTableBase> implements AsyncTableBuilder<T> {
+abstract class AsyncTableBuilderBase<C extends ScanResultConsumerBase>
+    implements AsyncTableBuilder<C> {
 
   protected TableName tableName;
 
@@ -51,7 +52,7 @@ abstract class AsyncTableBuilderBase<T extends AsyncTableBase> implements AsyncT
   AsyncTableBuilderBase(TableName tableName, AsyncConnectionConfiguration connConf) {
     this.tableName = tableName;
     this.operationTimeoutNs = tableName.isSystemTable() ? connConf.getMetaOperationTimeoutNs()
-        : connConf.getOperationTimeoutNs();
+      : connConf.getOperationTimeoutNs();
     this.scanTimeoutNs = connConf.getScanTimeoutNs();
     this.rpcTimeoutNs = connConf.getRpcTimeoutNs();
     this.readRpcTimeoutNs = connConf.getReadRpcTimeoutNs();
@@ -62,49 +63,49 @@ abstract class AsyncTableBuilderBase<T extends AsyncTableBase> implements AsyncT
   }
 
   @Override
-  public AsyncTableBuilderBase<T> setOperationTimeout(long timeout, TimeUnit unit) {
+  public AsyncTableBuilderBase<C> setOperationTimeout(long timeout, TimeUnit unit) {
     this.operationTimeoutNs = unit.toNanos(timeout);
     return this;
   }
 
   @Override
-  public AsyncTableBuilderBase<T> setScanTimeout(long timeout, TimeUnit unit) {
+  public AsyncTableBuilderBase<C> setScanTimeout(long timeout, TimeUnit unit) {
     this.scanTimeoutNs = unit.toNanos(timeout);
     return this;
   }
 
   @Override
-  public AsyncTableBuilderBase<T> setRpcTimeout(long timeout, TimeUnit unit) {
+  public AsyncTableBuilderBase<C> setRpcTimeout(long timeout, TimeUnit unit) {
     this.rpcTimeoutNs = unit.toNanos(timeout);
     return this;
   }
 
   @Override
-  public AsyncTableBuilderBase<T> setReadRpcTimeout(long timeout, TimeUnit unit) {
+  public AsyncTableBuilderBase<C> setReadRpcTimeout(long timeout, TimeUnit unit) {
     this.readRpcTimeoutNs = unit.toNanos(timeout);
     return this;
   }
 
   @Override
-  public AsyncTableBuilderBase<T> setWriteRpcTimeout(long timeout, TimeUnit unit) {
+  public AsyncTableBuilderBase<C> setWriteRpcTimeout(long timeout, TimeUnit unit) {
     this.writeRpcTimeoutNs = unit.toNanos(timeout);
     return this;
   }
 
   @Override
-  public AsyncTableBuilderBase<T> setRetryPause(long pause, TimeUnit unit) {
+  public AsyncTableBuilderBase<C> setRetryPause(long pause, TimeUnit unit) {
     this.pauseNs = unit.toNanos(pause);
     return this;
   }
 
   @Override
-  public AsyncTableBuilderBase<T> setMaxAttempts(int maxAttempts) {
+  public AsyncTableBuilderBase<C> setMaxAttempts(int maxAttempts) {
     this.maxAttempts = maxAttempts;
     return this;
   }
 
   @Override
-  public AsyncTableBuilderBase<T> setStartLogErrorsCnt(int startLogErrorsCnt) {
+  public AsyncTableBuilderBase<C> setStartLogErrorsCnt(int startLogErrorsCnt) {
     this.startLogErrorsCnt = startLogErrorsCnt;
     return this;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
index ae43f5b..c8553c6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
@@ -19,34 +19,37 @@ package org.apache.hadoop.hbase.client;
 
 import static java.util.stream.Collectors.toList;
 
+import com.google.protobuf.RpcChannel;
+
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
- * The implementation of AsyncTable. Based on {@link RawAsyncTable}.
+ * Just a wrapper of {@link RawAsyncTableImpl}. The difference is that users need to provide a
+ * thread pool when constructing this class, and the callback methods registered to the returned
+ * {@link CompletableFuture} will be executed in this thread pool. So usually it is safe for users
+ * to do anything they want in the callbacks without breaking the rpc framework.
  */
 @InterfaceAudience.Private
-class AsyncTableImpl implements AsyncTable {
+class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {
 
-  private final RawAsyncTable rawTable;
+  private final AsyncTable<AdvancedScanResultConsumer> rawTable;
 
   private final ExecutorService pool;
 
-  private final long defaultScannerMaxResultSize;
-
-  AsyncTableImpl(AsyncConnectionImpl conn, RawAsyncTable rawTable, ExecutorService pool) {
+  AsyncTableImpl(AsyncConnectionImpl conn, AsyncTable<AdvancedScanResultConsumer> rawTable,
+      ExecutorService pool) {
     this.rawTable = rawTable;
     this.pool = pool;
-    this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
   }
 
   @Override
@@ -172,16 +175,9 @@ class AsyncTableImpl implements AsyncTable {
     return wrap(rawTable.scanAll(scan));
   }
 
-  private long resultSize2CacheSize(long maxResultSize) {
-    // * 2 if possible
-    return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2;
-  }
-
   @Override
   public ResultScanner getScanner(Scan scan) {
-    return new AsyncTableResultScanner(rawTable, ReflectionUtils.newInstance(scan.getClass(), scan),
-        resultSize2CacheSize(
-          scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize));
+    return rawTable.getScanner(scan);
   }
 
   private void scan0(Scan scan, ScanResultConsumer consumer) {
@@ -222,4 +218,59 @@ class AsyncTableImpl implements AsyncTable {
   public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
     return rawTable.<T> batch(actions).stream().map(this::wrap).collect(toList());
   }
+
+  @Override
+  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
+      ServiceCaller<S, R> callable, byte[] row) {
+    return wrap(rawTable.coprocessorService(stubMaker, callable, row));
+  }
+
+  @Override
+  public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
+      Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
+      CoprocessorCallback<R> callback) {
+    CoprocessorCallback<R> wrappedCallback = new CoprocessorCallback<R>() {
+
+      @Override
+      public void onRegionComplete(RegionInfo region, R resp) {
+        pool.execute(() -> callback.onRegionComplete(region, resp));
+      }
+
+      @Override
+      public void onRegionError(RegionInfo region, Throwable error) {
+        pool.execute(() -> callback.onRegionError(region, error));
+      }
+
+      @Override
+      public void onComplete() {
+        pool.execute(() -> callback.onComplete());
+      }
+
+      @Override
+      public void onError(Throwable error) {
+        pool.execute(() -> callback.onError(error));
+      }
+    };
+    CoprocessorServiceBuilder<S, R> builder =
+      rawTable.coprocessorService(stubMaker, callable, wrappedCallback);
+    return new CoprocessorServiceBuilder<S, R>() {
+
+      @Override
+      public CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive) {
+        builder.fromRow(startKey, inclusive);
+        return this;
+      }
+
+      @Override
+      public CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive) {
+        builder.toRow(endKey, inclusive);
+        return this;
+      }
+
+      @Override
+      public void execute() {
+        builder.execute();
+      }
+    };
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
index 957f06f..fe9645a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
@@ -37,11 +37,11 @@ import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
  * {@code 2 * scan.getMaxResultSize()}.
  */
 @InterfaceAudience.Private
-class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
+class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsumer {
 
   private static final Log LOG = LogFactory.getLog(AsyncTableResultScanner.class);
 
-  private final RawAsyncTable rawTable;
+  private final AsyncTable<AdvancedScanResultConsumer> rawTable;
 
   private final long maxCacheSize;
 
@@ -59,7 +59,8 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
 
   private ScanResumer resumer;
 
-  public AsyncTableResultScanner(RawAsyncTable table, Scan scan, long maxCacheSize) {
+  public AsyncTableResultScanner(AsyncTable<AdvancedScanResultConsumer> table, Scan scan,
+      long maxCacheSize) {
     this.rawTable = table;
     this.maxCacheSize = maxCacheSize;
     this.scan = scan;
@@ -74,8 +75,8 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
   private void stopPrefetch(ScanController controller) {
     if (LOG.isDebugEnabled()) {
       LOG.debug(String.format("0x%x", System.identityHashCode(this)) +
-          " stop prefetching when scanning " + rawTable.getName() + " as the cache size " +
-          cacheSize + " is greater than the maxCacheSize " + maxCacheSize);
+        " stop prefetching when scanning " + rawTable.getName() + " as the cache size " +
+        cacheSize + " is greater than the maxCacheSize " + maxCacheSize);
     }
     resumer = controller.suspend();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index bc0ade2..780dcf9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -21,9 +21,6 @@ import static java.util.stream.Collectors.toList;
 import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
 import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
-
 import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.net.InetAddress;
@@ -41,26 +38,28 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.DNS;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
 import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.net.DNS;
 
 /**
  * Utility used by client connections.
@@ -378,7 +377,7 @@ public final class ConnectionUtils {
     }
   }
 
-  static boolean noMoreResultsForScan(Scan scan, HRegionInfo info) {
+  static boolean noMoreResultsForScan(Scan scan, RegionInfo info) {
     if (isEmptyStopRow(info.getEndKey())) {
       return true;
     }
@@ -392,7 +391,7 @@ public final class ConnectionUtils {
     return c > 0 || (c == 0 && !scan.includeStopRow());
   }
 
-  static boolean noMoreResultsForReverseScan(Scan scan, HRegionInfo info) {
+  static boolean noMoreResultsForReverseScan(Scan scan, RegionInfo info) {
     if (isEmptyStartRow(info.getStartKey())) {
       return true;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index bcf581b..6366cf0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -19,6 +19,11 @@ package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcChannel;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -63,7 +68,6 @@ import org.apache.hadoop.hbase.UnknownRegionException;
 import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
 import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
 import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
-import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallable;
 import org.apache.hadoop.hbase.client.Scan.ReadType;
 import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
 import org.apache.hadoop.hbase.client.replication.TableCFs;
@@ -83,6 +87,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
 import org.apache.yetus.audience.InterfaceAudience;
+
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
 import org.apache.hadoop.hbase.shaded.io.netty.util.HashedWheelTimer;
 import org.apache.hadoop.hbase.shaded.io.netty.util.Timeout;
@@ -245,11 +250,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Updat
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcChannel;
-
 /**
  * The implementation of AsyncAdmin.
  * <p>
@@ -263,7 +263,7 @@ import com.google.protobuf.RpcChannel;
  * @see AsyncConnection#getAdminBuilder()
  */
 @InterfaceAudience.Private
-public class RawAsyncHBaseAdmin implements AsyncAdmin {
+class RawAsyncHBaseAdmin implements AsyncAdmin {
   public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
 
   private static final Log LOG = LogFactory.getLog(AsyncHBaseAdmin.class);
@@ -272,7 +272,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
 
   private final HashedWheelTimer retryTimer;
 
-  private final RawAsyncTable metaTable;
+  private final AsyncTable<AdvancedScanResultConsumer> metaTable;
 
   private final long rpcTimeoutNs;
 
@@ -290,7 +290,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
       AsyncAdminBuilderBase builder) {
     this.connection = connection;
     this.retryTimer = retryTimer;
-    this.metaTable = connection.getRawTable(META_TABLE_NAME);
+    this.metaTable = connection.getTable(META_TABLE_NAME);
     this.rpcTimeoutNs = builder.rpcTimeoutNs;
     this.operationTimeoutNs = builder.operationTimeoutNs;
     this.pauseNs = builder.pauseNs;
@@ -1442,8 +1442,8 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
   public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
     CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
     Scan scan = QuotaTableUtil.makeScan(filter);
-    this.connection.getRawTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build()
-        .scan(scan, new RawScanResultConsumer() {
+    this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build()
+        .scan(scan, new AdvancedScanResultConsumer() {
           List<QuotaSettings> settings = new ArrayList<>();
 
           @Override
@@ -3001,7 +3001,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
 
   @Override
   public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
-      CoprocessorCallable<S, R> callable) {
+      ServiceCaller<S, R> callable) {
     MasterCoprocessorRpcChannelImpl channel =
         new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
     S stub = stubMaker.apply(channel);
@@ -3019,7 +3019,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
 
   @Override
   public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
-      CoprocessorCallable<S, R> callable, ServerName serverName) {
+      ServiceCaller<S, R> callable, ServerName serverName) {
     RegionServerCoprocessorRpcChannelImpl channel =
         new RegionServerCoprocessorRpcChannelImpl(this.<Message> newServerCaller().serverName(
           serverName));

http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
deleted file mode 100644
index 102f279..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
-
-import org.apache.yetus.audience.InterfaceAudience;
-
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcChannel;
-import com.google.protobuf.RpcController;
-
-/**
- * A low level asynchronous table.
- * <p>
- * The implementation is required to be thread safe.
- * <p>
- * The returned {@code CompletableFuture} will be finished directly in the rpc framework's callback
- * thread, so typically you should not do any time consuming work inside these methods, otherwise
- * you will be likely to block at least one connection to RS(even more if the rpc framework uses
- * NIO).
- * <p>
- * So, only experts that want to build high performance service should use this interface directly,
- * especially for the {@link #scan(Scan, RawScanResultConsumer)} below.
- * @since 2.0.0
- */
-@InterfaceAudience.Public
-public interface RawAsyncTable extends AsyncTableBase {
-
-  /**
-   * The basic scan API uses the observer pattern. All results that match the given scan object will
-   * be passed to the given {@code consumer} by calling {@code RawScanResultConsumer.onNext}.
-   * {@code RawScanResultConsumer.onComplete} means the scan is finished, and
-   * {@code RawScanResultConsumer.onError} means we hit an unrecoverable error and the scan is
-   * terminated. {@code RawScanResultConsumer.onHeartbeat} means the RS is still working but we can
-   * not get a valid result to call {@code RawScanResultConsumer.onNext}. This is usually because
-   * the matched results are too sparse, for example, a filter which almost filters out everything
-   * is specified.
-   * <p>
-   * Notice that, the methods of the given {@code consumer} will be called directly in the rpc
-   * framework's callback thread, so typically you should not do any time consuming work inside
-   * these methods, otherwise you will be likely to block at least one connection to RS(even more if
-   * the rpc framework uses NIO).
-   * @param scan A configured {@link Scan} object.
-   * @param consumer the consumer used to receive results.
-   */
-  void scan(Scan scan, RawScanResultConsumer consumer);
-
-  /**
-   * Delegate to a protobuf rpc call.
-   * <p>
-   * Usually, it is just a simple lambda expression, like:
-   *
-   * <pre>
-   * <code>
-   * (stub, controller, rpcCallback) -> {
-   *   XXXRequest request = ...; // prepare the request
-   *   stub.xxx(controller, request, rpcCallback);
-   * }
-   * </code>
-   * </pre>
-   *
-   * And if you can prepare the {@code request} before calling the coprocessorService method, the
-   * lambda expression will be:
-   *
-   * <pre>
-   * <code>
-   * (stub, controller, rpcCallback) -> stub.xxx(controller, request, rpcCallback)
-   * </code>
-   * </pre>
-   */
-  @InterfaceAudience.Public
-  @FunctionalInterface
-  interface CoprocessorCallable<S, R> {
-
-    /**
-     * Represent the actual protobuf rpc call.
-     * @param stub the asynchronous stub
-     * @param controller the rpc controller, has already been prepared for you
-     * @param rpcCallback the rpc callback, has already been prepared for you
-     */
-    void call(S stub, RpcController controller, RpcCallback<R> rpcCallback);
-  }
-
-  /**
-   * Execute the given coprocessor call on the region which contains the given {@code row}.
-   * <p>
-   * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a
-   * one line lambda expression, like:
-   *
-   * <pre>
-   * <code>
-   * channel -> xxxService.newStub(channel)
-   * </code>
-   * </pre>
-   *
-   * @param stubMaker a delegation to the actual {@code newStub} call.
-   * @param callable a delegation to the actual protobuf rpc call. See the comment of
-   *          {@link CoprocessorCallable} for more details.
-   * @param row The row key used to identify the remote region location
-   * @param <S> the type of the asynchronous stub
-   * @param <R> the type of the return value
-   * @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}.
-   * @see CoprocessorCallable
-   */
-  <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
-      CoprocessorCallable<S, R> callable, byte[] row);
-
-  /**
-   * The callback when we want to execute a coprocessor call on a range of regions.
-   * <p>
-   * As the locating itself also takes some time, the implementation may want to send rpc calls on
-   * the fly, which means we do not know how many regions we have when we get the return value of
-   * the rpc calls, so we need an {@link #onComplete()} which is used to tell you that we have
-   * passed all the return values to you(through the {@link #onRegionComplete(RegionInfo, Object)}
-   * or {@link #onRegionError(RegionInfo, Throwable)} calls), i.e, there will be no
-   * {@link #onRegionComplete(RegionInfo, Object)} or {@link #onRegionError(RegionInfo, Throwable)}
-   * calls in the future.
-   * <p>
-   * Here is a pseudo code to describe a typical implementation of a range coprocessor service
-   * method to help you better understand how the {@link CoprocessorCallback} will be called. The
-   * {@code callback} in the pseudo code is our {@link CoprocessorCallback}. And notice that the
-   * {@code whenComplete} is {@code CompletableFuture.whenComplete}.
-   *
-   * <pre>
-   * locateThenCall(byte[] row) {
-   *   locate(row).whenComplete((location, locateError) -> {
-   *     if (locateError != null) {
-   *       callback.onError(locateError);
-   *       return;
-   *     }
-   *     incPendingCall();
-   *     region = location.getRegion();
-   *     if (region.getEndKey() > endKey) {
-   *       locateEnd = true;
-   *     } else {
-   *       locateThenCall(region.getEndKey());
-   *     }
-   *     sendCall().whenComplete((resp, error) -> {
-   *       if (error != null) {
-   *         callback.onRegionError(region, error);
-   *       } else {
-   *         callback.onRegionComplete(region, resp);
-   *       }
-   *       if (locateEnd && decPendingCallAndGet() == 0) {
-   *         callback.onComplete();
-   *       }
-   *     });
-   *   });
-   * }
-   * </pre>
-   */
-  @InterfaceAudience.Public
-  interface CoprocessorCallback<R> {
-
-    /**
-     * @param region the region that the response belongs to
-     * @param resp the response of the coprocessor call
-     */
-    void onRegionComplete(RegionInfo region, R resp);
-
-    /**
-     * @param region the region that the error belongs to
-     * @param error the response error of the coprocessor call
-     */
-    void onRegionError(RegionInfo region, Throwable error);
-
-    /**
-     * Indicate that all responses of the regions have been notified by calling
-     * {@link #onRegionComplete(RegionInfo, Object)} or
-     * {@link #onRegionError(RegionInfo, Throwable)}.
-     */
-    void onComplete();
-
-    /**
-     * Indicate that we got an error which does not belong to any regions. Usually a locating error.
-     */
-    void onError(Throwable error);
-  }
-
-  /**
-   * Helper class for sending coprocessorService request that executes a coprocessor call on regions
-   * which are covered by a range.
-   * <p>
-   * If {@code fromRow} is not specified the selection will start with the first table region. If
-   * {@code toRow} is not specified the selection will continue through the last table region.
-   * @param <S> the type of the protobuf Service you want to call.
-   * @param <R> the type of the return value.
-   */
-  interface CoprocessorServiceBuilder<S, R> {
-
-    /**
-     * @param startKey start region selection with region containing this row, inclusive.
-     */
-    default CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey) {
-      return fromRow(startKey, true);
-    }
-
-    /**
-     * @param startKey start region selection with region containing this row
-     * @param inclusive whether to include the startKey
-     */
-    CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive);
-
-    /**
-     * @param endKey select regions up to and including the region containing this row, exclusive.
-     */
-    default CoprocessorServiceBuilder<S, R> toRow(byte[] endKey) {
-      return toRow(endKey, false);
-    }
-
-    /**
-     * @param endKey select regions up to and including the region containing this row
-     * @param inclusive whether to include the endKey
-     */
-    CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive);
-
-    /**
-     * Execute the coprocessorService request. You can get the response through the
-     * {@link CoprocessorCallback}.
-     */
-    void execute();
-  }
-
-  /**
-   * Execute a coprocessor call on the regions which are covered by a range.
-   * <p>
-   * Use the returned {@link CoprocessorServiceBuilder} construct your request and then execute it.
-   * <p>
-   * The {@code stubMaker} is just a delegation to the {@code xxxService.newStub} call. Usually it
-   * is only a one line lambda expression, like:
-   *
-   * <pre>
-   * <code>
-   * channel -> xxxService.newStub(channel)
-   * </code>
-   * </pre>
-   *
-   * @param stubMaker a delegation to the actual {@code newStub} call.
-   * @param callable a delegation to the actual protobuf rpc call. See the comment of
-   *          {@link CoprocessorCallable} for more details.
-   * @param callback callback to get the response. See the comment of {@link CoprocessorCallback}
-   *          for more details.
-   */
-  <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(Function<RpcChannel, S> stubMaker,
-      CoprocessorCallable<S, R> callable, CoprocessorCallback<R> callback);
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index d4de573..07a2b92 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -62,9 +62,16 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType
 
 /**
  * The implementation of RawAsyncTable.
+ * <p>
+ * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
+ * be finished inside the rpc framework thread, which means that the callbacks registered to the
+ * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
+ * this class should not try to do time consuming tasks in the callbacks.
+ * @since 2.0.0
+ * @see AsyncTableImpl
  */
 @InterfaceAudience.Private
-class RawAsyncTableImpl implements RawAsyncTable {
+class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
 
   private final AsyncConnectionImpl conn;
 
@@ -102,7 +109,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
     this.maxAttempts = builder.maxAttempts;
     this.startLogErrorsCnt = builder.startLogErrorsCnt;
     this.defaultScannerCaching = tableName.isSystemTable() ? conn.connConf.getMetaScannerCaching()
-        : conn.connConf.getScannerCaching();
+      : conn.connConf.getScannerCaching();
     this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
   }
 
@@ -270,7 +277,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
     @Override
     public CheckAndMutateBuilder qualifier(byte[] qualifier) {
       this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" +
-          " an empty byte array, or just do not call this method if you want a null qualifier");
+        " an empty byte array, or just do not call this method if you want a null qualifier");
       return this;
     }
 
@@ -290,7 +297,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
 
     private void preCheck() {
       Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" +
-          " calling ifNotExists/ifEquals/ifMatches before executing the request");
+        " calling ifNotExists/ifEquals/ifMatches before executing the request");
     }
 
     @Override
@@ -354,14 +361,12 @@ class RawAsyncTableImpl implements RawAsyncTable {
           } else {
             try {
               org.apache.hadoop.hbase.client.MultiResponse multiResp =
-                  ResponseConverter.getResults(req, resp, controller.cellScanner());
+                ResponseConverter.getResults(req, resp, controller.cellScanner());
               Throwable ex = multiResp.getException(regionName);
               if (ex != null) {
-                future
-                    .completeExceptionally(ex instanceof IOException ? ex
-                        : new IOException(
-                            "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()),
-                            ex));
+                future.completeExceptionally(ex instanceof IOException ? ex
+                  : new IOException(
+                      "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex));
               } else {
                 future.complete(respConverter
                     .apply((Result) multiResp.getResults().get(regionName).result.get(0)));
@@ -400,11 +405,28 @@ class RawAsyncTableImpl implements RawAsyncTable {
     return newScan;
   }
 
+  public void scan(Scan scan, AdvancedScanResultConsumer consumer) {
+    new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, pauseNs,
+        maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start();
+  }
+
+  private long resultSize2CacheSize(long maxResultSize) {
+    // * 2 if possible
+    return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2;
+  }
+
+  @Override
+  public ResultScanner getScanner(Scan scan) {
+    return new AsyncTableResultScanner(this, ReflectionUtils.newInstance(scan.getClass(), scan),
+        resultSize2CacheSize(
+          scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize));
+  }
+
   @Override
   public CompletableFuture<List<Result>> scanAll(Scan scan) {
     CompletableFuture<List<Result>> future = new CompletableFuture<>();
     List<Result> scanResults = new ArrayList<>();
-    scan(scan, new RawScanResultConsumer() {
+    scan(scan, new AdvancedScanResultConsumer() {
 
       @Override
       public void onNext(Result[] results, ScanController controller) {
@@ -424,11 +446,6 @@ class RawAsyncTableImpl implements RawAsyncTable {
     return future;
   }
 
-  public void scan(Scan scan, RawScanResultConsumer consumer) {
-    new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, pauseNs,
-        maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start();
-  }
-
   @Override
   public List<CompletableFuture<Result>> get(List<Get> gets) {
     return batch(gets, readRpcTimeoutNs);
@@ -487,7 +504,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
   }
 
   private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
-      CoprocessorCallable<S, R> callable, RegionInfo region, byte[] row) {
+      ServiceCaller<S, R> callable, RegionInfo region, byte[] row) {
     RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName,
         region, row, rpcTimeoutNs, operationTimeoutNs);
     S stub = stubMaker.apply(channel);
@@ -505,7 +522,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
 
   @Override
   public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
-      CoprocessorCallable<S, R> callable, byte[] row) {
+      ServiceCaller<S, R> callable, byte[] row) {
     return coprocessorService(stubMaker, callable, null, row);
   }
 
@@ -527,7 +544,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
   }
 
   private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker,
-      CoprocessorCallable<S, R> callable, CoprocessorCallback<R> callback,
+      ServiceCaller<S, R> callable, CoprocessorCallback<R> callback,
       List<HRegionLocation> locs, byte[] endKey, boolean endKeyInclusive,
       AtomicBoolean locateFinished, AtomicInteger unfinishedRequest, HRegionLocation loc,
       Throwable error) {
@@ -563,7 +580,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
 
     private final Function<RpcChannel, S> stubMaker;
 
-    private final CoprocessorCallable<S, R> callable;
+    private final ServiceCaller<S, R> callable;
 
     private final CoprocessorCallback<R> callback;
 
@@ -576,7 +593,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
     private boolean endKeyInclusive;
 
     public CoprocessorServiceBuilderImpl(Function<RpcChannel, S> stubMaker,
-        CoprocessorCallable<S, R> callable, CoprocessorCallback<R> callback) {
+        ServiceCaller<S, R> callable, CoprocessorCallback<R> callback) {
       this.stubMaker = Preconditions.checkNotNull(stubMaker, "stubMaker is null");
       this.callable = Preconditions.checkNotNull(callable, "callable is null");
       this.callback = Preconditions.checkNotNull(callback, "callback is null");
@@ -586,8 +603,8 @@ class RawAsyncTableImpl implements RawAsyncTable {
     public CoprocessorServiceBuilderImpl<S, R> fromRow(byte[] startKey, boolean inclusive) {
       this.startKey = Preconditions.checkNotNull(startKey,
         "startKey is null. Consider using" +
-            " an empty byte array, or just do not call this method if you want to start selection" +
-            " from the first region");
+          " an empty byte array, or just do not call this method if you want to start selection" +
+          " from the first region");
       this.startKeyInclusive = inclusive;
       return this;
     }
@@ -596,8 +613,8 @@ class RawAsyncTableImpl implements RawAsyncTable {
     public CoprocessorServiceBuilderImpl<S, R> toRow(byte[] endKey, boolean inclusive) {
       this.endKey = Preconditions.checkNotNull(endKey,
         "endKey is null. Consider using" +
-            " an empty byte array, or just do not call this method if you want to continue" +
-            " selection to the last region");
+          " an empty byte array, or just do not call this method if you want to continue" +
+          " selection to the last region");
       this.endKeyInclusive = inclusive;
       return this;
     }
@@ -614,7 +631,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
 
   @Override
   public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
-      Function<RpcChannel, S> stubMaker, CoprocessorCallable<S, R> callable,
+      Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
       CoprocessorCallback<R> callback) {
     return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
deleted file mode 100644
index 7ab02d8..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import java.util.Optional;
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
-
-/**
- * Receives {@link Result} for an asynchronous scan.
- * <p>
- * Notice that, the {@link #onNext(Result[], ScanController)} method will be called in the thread
- * which we send request to HBase service. So if you want the asynchronous scanner fetch data from
- * HBase in background while you process the returned data, you need to move the processing work to
- * another thread to make the {@code onNext} call return immediately. And please do NOT do any time
- * consuming tasks in all methods below unless you know what you are doing.
- * @since 2.0.0
- */
-@InterfaceAudience.Public
-public interface RawScanResultConsumer {
-
-  /**
-   * Used to resume a scan.
-   */
-  @InterfaceAudience.Public
-  interface ScanResumer {
-
-    /**
-     * Resume the scan. You are free to call it multiple time but only the first call will take
-     * effect.
-     */
-    void resume();
-  }
-
-  /**
-   * Used to suspend or stop a scan, or get a scan cursor if available.
-   * <p>
-   * Notice that, you should only call the {@link #suspend()} or {@link #terminate()} inside onNext
-   * or onHeartbeat method. A IllegalStateException will be thrown if you call them at other places.
-   * <p>
-   * You can only call one of the {@link #suspend()} and {@link #terminate()} methods(of course you
-   * are free to not call them both), and the methods are not reentrant. An IllegalStateException
-   * will be thrown if you have already called one of the methods.
-   */
-  @InterfaceAudience.Public
-  interface ScanController {
-
-    /**
-     * Suspend the scan.
-     * <p>
-     * This means we will stop fetching data in background, i.e., will not call onNext any more
-     * before you resume the scan.
-     * @return A resumer used to resume the scan later.
-     */
-    ScanResumer suspend();
-
-    /**
-     * Terminate the scan.
-     * <p>
-     * This is useful when you have got enough results and want to stop the scan in onNext method,
-     * or you want to stop the scan in onHeartbeat method because it has spent too many time.
-     */
-    void terminate();
-
-    /**
-     * Get the scan cursor if available.
-     * @return The scan cursor.
-     */
-    Optional<Cursor> cursor();
-  }
-
-  /**
-   * Indicate that we have receive some data.
-   * @param results the data fetched from HBase service.
-   * @param controller used to suspend or terminate the scan. Notice that the {@code controller}
-   *          instance is only valid within scope of onNext method. You can only call its method in
-   *          onNext, do NOT store it and call it later outside onNext.
-   */
-  void onNext(Result[] results, ScanController controller);
-
-  /**
-   * Indicate that there is a heartbeat message but we have not cumulated enough cells to call
-   * {@link #onNext(Result[], ScanController)}.
-   * <p>
-   * Note that this method will always be called when RS returns something to us but we do not have
-   * enough cells to call {@link #onNext(Result[], ScanController)}. Sometimes it may not be a
-   * 'heartbeat' message for RS, for example, we have a large row with many cells and size limit is
-   * exceeded before sending all the cells for this row. For RS it does send some data to us and the
-   * time limit has not been reached, but we can not return the data to client so here we call this
-   * method to tell client we have already received something.
-   * <p>
-   * This method give you a chance to terminate a slow scan operation.
-   * @param controller used to suspend or terminate the scan. Notice that the {@code controller}
-   *          instance is only valid within the scope of onHeartbeat method. You can only call its
-   *          method in onHeartbeat, do NOT store it and call it later outside onHeartbeat.
-   */
-  default void onHeartbeat(ScanController controller) {
-  }
-
-  /**
-   * Indicate that we hit an unrecoverable error and the scan operation is terminated.
-   * <p>
-   * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}.
-   */
-  void onError(Throwable error);
-
-  /**
-   * Indicate that the scan operation is completed normally.
-   */
-  void onComplete();
-
-  /**
-   * If {@code scan.isScanMetricsEnabled()} returns true, then this method will be called prior to
-   * all other methods in this interface to give you the {@link ScanMetrics} instance for this scan
-   * operation. The {@link ScanMetrics} instance will be updated on-the-fly during the scan, you can
-   * store it somewhere to get the metrics at any time if you want.
-   */
-  default void onScanMetricsCreated(ScanMetrics scanMetrics) {
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java
index 826a8ef..be3108b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java
@@ -18,38 +18,20 @@
 package org.apache.hadoop.hbase.client;
 
 import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 
 /**
  * Receives {@link Result} for an asynchronous scan.
+ * <p>
+ * All results that match the given scan object will be passed to this class by calling
+ * {@link #onNext(Result)}. {@link #onComplete()} means the scan is finished, and
+ * {@link #onError(Throwable)} means we hit an unrecoverable error and the scan is terminated.
  */
 @InterfaceAudience.Public
-public interface ScanResultConsumer {
+public interface ScanResultConsumer extends ScanResultConsumerBase {
 
   /**
    * @param result the data fetched from HBase service.
    * @return {@code false} if you want to terminate the scan process. Otherwise {@code true}
    */
   boolean onNext(Result result);
-
-  /**
-   * Indicate that we hit an unrecoverable error and the scan operation is terminated.
-   * <p>
-   * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}.
-   */
-  void onError(Throwable error);
-
-  /**
-   * Indicate that the scan operation is completed normally.
-   */
-  void onComplete();
-
-  /**
-   * If {@code scan.isScanMetricsEnabled()} returns true, then this method will be called prior to
-   * all other methods in this interface to give you the {@link ScanMetrics} instance for this scan
-   * operation. The {@link ScanMetrics} instance will be updated on-the-fly during the scan, you can
-   * store it somewhere to get the metrics at any time if you want.
-   */
-  default void onScanMetricsCreated(ScanMetrics scanMetrics) {
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumerBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumerBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumerBase.java
new file mode 100644
index 0000000..538cf9d
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumerBase.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * The base interface for scan result consumer.
+ */
+@InterfaceAudience.Public
+public interface ScanResultConsumerBase {
+  /**
+   * Indicate that we hit an unrecoverable error and the scan operation is terminated.
+   * <p>
+   * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}.
+   */
+  void onError(Throwable error);
+
+  /**
+   * Indicate that the scan operation is completed normally.
+   */
+  void onComplete();
+
+  /**
+   * If {@code scan.isScanMetricsEnabled()} returns true, then this method will be called prior to
+   * all other methods in this interface to give you the {@link ScanMetrics} instance for this scan
+   * operation. The {@link ScanMetrics} instance will be updated on-the-fly during the scan, you can
+   * store it somewhere to get the metrics at any time if you want.
+   */
+  default void onScanMetricsCreated(ScanMetrics scanMetrics) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServiceCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServiceCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServiceCaller.java
new file mode 100644
index 0000000..467f1a2
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServiceCaller.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Delegate to a protobuf rpc call.
+ * <p>
+ * Usually, it is just a simple lambda expression, like:
+ *
+ * <pre>
+ * <code>
+ * (stub, controller, rpcCallback) -> {
+ *   XXXRequest request = ...; // prepare the request
+ *   stub.xxx(controller, request, rpcCallback);
+ * }
+ * </code>
+ * </pre>
+ *
+ * And if already have the {@code request}, the lambda expression will be:
+ *
+ * <pre>
+ * <code>
+ * (stub, controller, rpcCallback) -> stub.xxx(controller, request, rpcCallback)
+ * </code>
+ * </pre>
+ *
+ * @param <S> the type of the protobuf Service you want to call.
+ * @param <R> the type of the return value.
+ */
+@InterfaceAudience.Public
+@FunctionalInterface
+public interface ServiceCaller<S, R> {
+
+  /**
+   * Represent the actual protobuf rpc call.
+   * @param stub the asynchronous stub
+   * @param controller the rpc controller, has already been prepared for you
+   * @param rpcCallback the rpc callback, has already been prepared for you
+   */
+  void call(S stub, RpcController controller, RpcCallback<R> rpcCallback);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
index ff9b873..371e865 100644
--- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
+++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
@@ -32,9 +32,9 @@ import java.util.concurrent.CompletableFuture;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.RawAsyncTable;
-import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallback;
-import org.apache.hadoop.hbase.client.RawScanResultConsumer;
+import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.AsyncTable.CoprocessorCallback;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
@@ -126,7 +126,7 @@ public class AsyncAggregationClient {
   }
 
   public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<R>
-      max(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
+      max(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
     CompletableFuture<R> future = new CompletableFuture<>();
     AggregateRequest req;
     try {
@@ -163,7 +163,7 @@ public class AsyncAggregationClient {
   }
 
   public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<R>
-      min(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
+      min(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
     CompletableFuture<R> future = new CompletableFuture<>();
     AggregateRequest req;
     try {
@@ -201,7 +201,7 @@ public class AsyncAggregationClient {
 
   public static <R, S, P extends Message, Q extends Message, T extends Message>
       CompletableFuture<Long>
-      rowCount(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
+      rowCount(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
     CompletableFuture<Long> future = new CompletableFuture<>();
     AggregateRequest req;
     try {
@@ -233,7 +233,7 @@ public class AsyncAggregationClient {
   }
 
   public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<S>
-      sum(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
+      sum(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
     CompletableFuture<S> future = new CompletableFuture<>();
     AggregateRequest req;
     try {
@@ -269,7 +269,7 @@ public class AsyncAggregationClient {
 
   public static <R, S, P extends Message, Q extends Message, T extends Message>
       CompletableFuture<Double>
-      avg(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
+      avg(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
     CompletableFuture<Double> future = new CompletableFuture<>();
     AggregateRequest req;
     try {
@@ -307,7 +307,7 @@ public class AsyncAggregationClient {
 
   public static <R, S, P extends Message, Q extends Message, T extends Message>
       CompletableFuture<Double>
-      std(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
+      std(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
     CompletableFuture<Double> future = new CompletableFuture<>();
     AggregateRequest req;
     try {
@@ -351,7 +351,7 @@ public class AsyncAggregationClient {
   // the map key is the startRow of the region
   private static <R, S, P extends Message, Q extends Message, T extends Message>
       CompletableFuture<NavigableMap<byte[], S>>
-      sumByRegion(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
+      sumByRegion(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
     CompletableFuture<NavigableMap<byte[], S>> future =
         new CompletableFuture<NavigableMap<byte[], S>>();
     AggregateRequest req;
@@ -388,8 +388,8 @@ public class AsyncAggregationClient {
   }
 
   private static <R, S, P extends Message, Q extends Message, T extends Message> void findMedian(
-      CompletableFuture<R> future, RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci,
-      Scan scan, NavigableMap<byte[], S> sumByRegion) {
+      CompletableFuture<R> future, AsyncTable<AdvancedScanResultConsumer> table,
+      ColumnInterpreter<R, S, P, Q, T> ci, Scan scan, NavigableMap<byte[], S> sumByRegion) {
     double halfSum = ci.divideForAvg(sumByRegion.values().stream().reduce(ci::add).get(), 2L);
     S movingSum = null;
     byte[] startRow = null;
@@ -410,7 +410,7 @@ public class AsyncAggregationClient {
     NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(family);
     byte[] weightQualifier = qualifiers.last();
     byte[] valueQualifier = qualifiers.first();
-    table.scan(scan, new RawScanResultConsumer() {
+    table.scan(scan, new AdvancedScanResultConsumer() {
 
       private S sum = baseSum;
 
@@ -456,8 +456,9 @@ public class AsyncAggregationClient {
     });
   }
 
-  public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<R>
-      median(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
+  public static <R, S, P extends Message, Q extends Message, T extends Message>
+      CompletableFuture<R> median(AsyncTable<AdvancedScanResultConsumer> table,
+      ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
     CompletableFuture<R> future = new CompletableFuture<>();
     sumByRegion(table, ci, scan).whenComplete((sumByRegion, error) -> {
       if (error != null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java
index 389aaaf..12e5b8d 100644
--- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java
@@ -55,7 +55,7 @@ public class TestAsyncAggregationClient {
 
   private static AsyncConnection CONN;
 
-  private static RawAsyncTable TABLE;
+  private static AsyncTable<AdvancedScanResultConsumer> TABLE;
 
   @BeforeClass
   public static void setUp() throws Exception {
@@ -69,7 +69,7 @@ public class TestAsyncAggregationClient {
     }
     UTIL.createTable(TABLE_NAME, CF, splitKeys);
     CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
-    TABLE = CONN.getRawTable(TABLE_NAME);
+    TABLE = CONN.getTable(TABLE_NAME);
     TABLE.putAll(LongStream.range(0, COUNT)
         .mapToObj(l -> new Put(Bytes.toBytes(String.format("%03d", l)))
             .addColumn(CF, CQ, Bytes.toBytes(l)).addColumn(CF, CQ2, Bytes.toBytes(l * l)))

http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java
index 2105547..67aba62 100644
--- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java
@@ -141,7 +141,7 @@ public class AsyncClientExample extends Configured implements Tool {
           latch.countDown();
           return;
         }
-        AsyncTable table = conn.getTable(tableName, threadPool);
+        AsyncTable<?> table = conn.getTable(tableName, threadPool);
         table.put(new Put(getKey(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(i)))
             .whenComplete((putResp, putErr) -> {
               if (putErr != null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java
index bb83bac..e3686f4 100644
--- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java
@@ -17,9 +17,24 @@
  */
 package org.apache.hadoop.hbase.client.example;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper;
+import org.apache.hadoop.hbase.util.Bytes;
+
 import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
 import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
-
 import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.ServerBootstrap;
 import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBuf;
 import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
@@ -43,26 +58,10 @@ import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.http.HttpVersion;
 import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.http.QueryStringDecoder;
 import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.GlobalEventExecutor;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.charset.StandardCharsets;
-import java.util.Optional;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.AsyncConnection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RawAsyncTable;
-import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper;
-import org.apache.hadoop.hbase.util.Bytes;
-
 /**
- * A simple example on how to use {@link RawAsyncTable} to write a fully asynchronous HTTP proxy
- * server. The {@link AsyncConnection} will share the same event loop with the HTTP server.
+ * A simple example on how to use {@link org.apache.hadoop.hbase.client.AsyncTable} to write a fully
+ * asynchronous HTTP proxy server. The {@link AsyncConnection} will share the same event loop with
+ * the HTTP server.
  * <p>
  * The request URL is:
  *
@@ -160,7 +159,7 @@ public class HttpProxyExample {
 
     private void get(ChannelHandlerContext ctx, FullHttpRequest req) {
       Params params = parse(req);
-      conn.getRawTable(TableName.valueOf(params.table)).get(new Get(Bytes.toBytes(params.row))
+      conn.getTable(TableName.valueOf(params.table)).get(new Get(Bytes.toBytes(params.row))
           .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier)))
           .whenComplete((r, e) -> {
             if (e != null) {
@@ -181,7 +180,7 @@ public class HttpProxyExample {
       Params params = parse(req);
       byte[] value = new byte[req.content().readableBytes()];
       req.content().readBytes(value);
-      conn.getRawTable(TableName.valueOf(params.table)).put(new Put(Bytes.toBytes(params.row))
+      conn.getTable(TableName.valueOf(params.table)).put(new Put(Bytes.toBytes(params.row))
           .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier), value))
           .whenComplete((r, e) -> {
             if (e != null) {