You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/11/22 06:45:52 UTC

[1/2] hbase git commit: HBASE-17141 Introduce a more user-friendly implementation of AsyncTable

Repository: hbase
Updated Branches:
  refs/heads/master b297f2dae -> 6ff19f94f


http://git-wip-us.apache.org/repos/asf/hbase/blob/6ff19f94/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
new file mode 100644
index 0000000..bf9a3f2
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -0,0 +1,408 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+
+/**
+ * The implementation of RawAsyncTable.
+ */
+@InterfaceAudience.Private
+class RawAsyncTableImpl implements RawAsyncTable {
+
+  private static final Log LOG = LogFactory.getLog(RawAsyncTableImpl.class);
+
+  private final AsyncConnectionImpl conn;
+
+  private final TableName tableName;
+
+  private final int defaultScannerCaching;
+
+  private final long defaultScannerMaxResultSize;
+
+  private long readRpcTimeoutNs;
+
+  private long writeRpcTimeoutNs;
+
+  private long operationTimeoutNs;
+
+  private long scanTimeoutNs;
+
+  public RawAsyncTableImpl(AsyncConnectionImpl conn, TableName tableName) {
+    this.conn = conn;
+    this.tableName = tableName;
+    this.readRpcTimeoutNs = conn.connConf.getReadRpcTimeoutNs();
+    this.writeRpcTimeoutNs = conn.connConf.getWriteRpcTimeoutNs();
+    this.operationTimeoutNs = tableName.isSystemTable() ? conn.connConf.getMetaOperationTimeoutNs()
+        : conn.connConf.getOperationTimeoutNs();
+    this.defaultScannerCaching = conn.connConf.getScannerCaching();
+    this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
+    this.scanTimeoutNs = conn.connConf.getScanTimeoutNs();
+  }
+
+  @Override
+  public TableName getName() {
+    return tableName;
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return conn.getConfiguration();
+  }
+
+  @FunctionalInterface
+  private interface Converter<D, I, S> {
+    D convert(I info, S src) throws IOException;
+  }
+
+  @FunctionalInterface
+  private interface RpcCall<RESP, REQ> {
+    void call(ClientService.Interface stub, HBaseRpcController controller, REQ req,
+        RpcCallback<RESP> done);
+  }
+
+  private static <REQ, PREQ, PRESP, RESP> CompletableFuture<RESP> call(
+      HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req,
+      Converter<PREQ, byte[], REQ> reqConvert, RpcCall<PRESP, PREQ> rpcCall,
+      Converter<RESP, HBaseRpcController, PRESP> respConverter) {
+    CompletableFuture<RESP> future = new CompletableFuture<>();
+    try {
+      rpcCall.call(stub, controller, reqConvert.convert(loc.getRegionInfo().getRegionName(), req),
+        new RpcCallback<PRESP>() {
+
+          @Override
+          public void run(PRESP resp) {
+            if (controller.failed()) {
+              future.completeExceptionally(controller.getFailed());
+            } else {
+              try {
+                future.complete(respConverter.convert(controller, resp));
+              } catch (IOException e) {
+                future.completeExceptionally(e);
+              }
+            }
+          }
+        });
+    } catch (IOException e) {
+      future.completeExceptionally(e);
+    }
+    return future;
+  }
+
+  private static <REQ, RESP> CompletableFuture<RESP> mutate(HBaseRpcController controller,
+      HRegionLocation loc, ClientService.Interface stub, REQ req,
+      Converter<MutateRequest, byte[], REQ> reqConvert,
+      Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
+    return call(controller, loc, stub, req, reqConvert, (s, c, r, done) -> s.mutate(c, r, done),
+      respConverter);
+  }
+
+  private static <REQ> CompletableFuture<Void> voidMutate(HBaseRpcController controller,
+      HRegionLocation loc, ClientService.Interface stub, REQ req,
+      Converter<MutateRequest, byte[], REQ> reqConvert) {
+    return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> {
+      return null;
+    });
+  }
+
+  private static Result toResult(HBaseRpcController controller, MutateResponse resp)
+      throws IOException {
+    if (!resp.hasResult()) {
+      return null;
+    }
+    return ProtobufUtil.toResult(resp.getResult(), controller.cellScanner());
+  }
+
+  @FunctionalInterface
+  private interface NoncedConverter<D, I, S> {
+    D convert(I info, S src, long nonceGroup, long nonce) throws IOException;
+  }
+
+  private <REQ, RESP> CompletableFuture<RESP> noncedMutate(HBaseRpcController controller,
+      HRegionLocation loc, ClientService.Interface stub, REQ req,
+      NoncedConverter<MutateRequest, byte[], REQ> reqConvert,
+      Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
+    long nonceGroup = conn.getNonceGenerator().getNonceGroup();
+    long nonce = conn.getNonceGenerator().newNonce();
+    return mutate(controller, loc, stub, req,
+      (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter);
+  }
+
+  private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, long rpcTimeoutNs) {
+    return conn.callerFactory.<T> single().table(tableName).row(row)
+        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS);
+  }
+
+  private <T> SingleRequestCallerBuilder<T> newCaller(Row row, long rpcTimeoutNs) {
+    return newCaller(row.getRow(), rpcTimeoutNs);
+  }
+
+  @Override
+  public CompletableFuture<Result> get(Get get) {
+    return this.<Result> newCaller(get, readRpcTimeoutNs)
+        .action((controller, loc, stub) -> RawAsyncTableImpl
+            .<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get,
+              RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done),
+              (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner())))
+        .call();
+  }
+
+  @Override
+  public CompletableFuture<Void> put(Put put) {
+    return this.<Void> newCaller(put, writeRpcTimeoutNs)
+        .action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub,
+          put, RequestConverter::buildMutateRequest))
+        .call();
+  }
+
+  @Override
+  public CompletableFuture<Void> delete(Delete delete) {
+    return this.<Void> newCaller(delete, writeRpcTimeoutNs)
+        .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc,
+          stub, delete, RequestConverter::buildMutateRequest))
+        .call();
+  }
+
+  @Override
+  public CompletableFuture<Result> append(Append append) {
+    checkHasFamilies(append);
+    return this.<Result> newCaller(append, writeRpcTimeoutNs)
+        .action((controller, loc, stub) -> this.<Append, Result> noncedMutate(controller, loc, stub,
+          append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
+        .call();
+  }
+
+  @Override
+  public CompletableFuture<Result> increment(Increment increment) {
+    checkHasFamilies(increment);
+    return this.<Result> newCaller(increment, writeRpcTimeoutNs)
+        .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(controller, loc,
+          stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
+        .call();
+  }
+
+  @Override
+  public CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier,
+      CompareOp compareOp, byte[] value, Put put) {
+    return this.<Boolean> newCaller(row, writeRpcTimeoutNs)
+        .action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller, loc,
+          stub, put,
+          (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
+            new BinaryComparator(value), CompareType.valueOf(compareOp.name()), p),
+          (c, r) -> r.getProcessed()))
+        .call();
+  }
+
+  @Override
+  public CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
+      CompareOp compareOp, byte[] value, Delete delete) {
+    return this.<Boolean> newCaller(row, writeRpcTimeoutNs)
+        .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller,
+          loc, stub, delete,
+          (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
+            new BinaryComparator(value), CompareType.valueOf(compareOp.name()), d),
+          (c, r) -> r.getProcessed()))
+        .call();
+  }
+
+  // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
+  // so here I write a new method as I do not want to change the abstraction of call method.
+  private static <RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller,
+      HRegionLocation loc, ClientService.Interface stub, RowMutations mutation,
+      Converter<MultiRequest, byte[], RowMutations> reqConvert,
+      Function<Result, RESP> respConverter) {
+    CompletableFuture<RESP> future = new CompletableFuture<>();
+    try {
+      byte[] regionName = loc.getRegionInfo().getRegionName();
+      MultiRequest req = reqConvert.convert(regionName, mutation);
+      stub.multi(controller, req, new RpcCallback<MultiResponse>() {
+
+        @Override
+        public void run(MultiResponse resp) {
+          if (controller.failed()) {
+            future.completeExceptionally(controller.getFailed());
+          } else {
+            try {
+              org.apache.hadoop.hbase.client.MultiResponse multiResp =
+                  ResponseConverter.getResults(req, resp, controller.cellScanner());
+              Throwable ex = multiResp.getException(regionName);
+              if (ex != null) {
+                future
+                    .completeExceptionally(ex instanceof IOException ? ex
+                        : new IOException(
+                            "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()),
+                            ex));
+              } else {
+                future.complete(respConverter
+                    .apply((Result) multiResp.getResults().get(regionName).result.get(0)));
+              }
+            } catch (IOException e) {
+              future.completeExceptionally(e);
+            }
+          }
+        }
+      });
+    } catch (IOException e) {
+      future.completeExceptionally(e);
+    }
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<Void> mutateRow(RowMutations mutation) {
+    return this.<Void> newCaller(mutation, writeRpcTimeoutNs).action((controller, loc,
+        stub) -> RawAsyncTableImpl.<Void> mutateRow(controller, loc, stub, mutation, (rn, rm) -> {
+          RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm);
+          regionMutationBuilder.setAtomic(true);
+          return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
+        }, (resp) -> {
+          return null;
+        })).call();
+  }
+
+  @Override
+  public CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
+      CompareOp compareOp, byte[] value, RowMutations mutation) {
+    return this.<Boolean> newCaller(mutation, writeRpcTimeoutNs)
+        .action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc,
+          stub, mutation,
+          (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
+            new BinaryComparator(value), CompareType.valueOf(compareOp.name()), rm),
+          (resp) -> resp.getExists()))
+        .call();
+  }
+
+  private <T> CompletableFuture<T> failedFuture(Throwable error) {
+    CompletableFuture<T> future = new CompletableFuture<>();
+    future.completeExceptionally(error);
+    return future;
+  }
+
+  private Scan setDefaultScanConfig(Scan scan) {
+    // always create a new scan object as we may reset the start row later.
+    Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan);
+    if (newScan.getCaching() <= 0) {
+      newScan.setCaching(defaultScannerCaching);
+    }
+    if (newScan.getMaxResultSize() <= 0) {
+      newScan.setMaxResultSize(defaultScannerMaxResultSize);
+    }
+    return newScan;
+  }
+
+  @Override
+  public CompletableFuture<List<Result>> smallScan(Scan scan, int limit) {
+    if (!scan.isSmall()) {
+      return failedFuture(new IllegalArgumentException("Only small scan is allowed"));
+    }
+    if (scan.getBatch() > 0 || scan.getAllowPartialResults()) {
+      return failedFuture(
+        new IllegalArgumentException("Batch and allowPartial is not allowed for small scan"));
+    }
+    return conn.callerFactory.smallScan().table(tableName).setScan(setDefaultScanConfig(scan))
+        .limit(limit).scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS)
+        .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call();
+  }
+
+  public void scan(Scan scan, ScanResultConsumer consumer) {
+    if (scan.isSmall()) {
+      if (scan.getBatch() > 0 || scan.getAllowPartialResults()) {
+        consumer.onError(
+          new IllegalArgumentException("Batch and allowPartial is not allowed for small scan"));
+      } else {
+        LOG.warn("This is small scan " + scan + ", consider using smallScan directly?");
+      }
+    }
+    scan = setDefaultScanConfig(scan);
+    new AsyncClientScanner(scan, consumer, tableName, conn, scanTimeoutNs, readRpcTimeoutNs)
+        .start();
+  }
+
+  @Override
+  public void setReadRpcTimeout(long timeout, TimeUnit unit) {
+    this.readRpcTimeoutNs = unit.toNanos(timeout);
+  }
+
+  @Override
+  public long getReadRpcTimeout(TimeUnit unit) {
+    return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS);
+  }
+
+  @Override
+  public void setWriteRpcTimeout(long timeout, TimeUnit unit) {
+    this.writeRpcTimeoutNs = unit.toNanos(timeout);
+  }
+
+  @Override
+  public long getWriteRpcTimeout(TimeUnit unit) {
+    return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS);
+  }
+
+  @Override
+  public void setOperationTimeout(long timeout, TimeUnit unit) {
+    this.operationTimeoutNs = unit.toNanos(timeout);
+  }
+
+  @Override
+  public long getOperationTimeout(TimeUnit unit) {
+    return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS);
+  }
+
+  @Override
+  public void setScanTimeout(long timeout, TimeUnit unit) {
+    this.scanTimeoutNs = unit.toNanos(timeout);
+  }
+
+  @Override
+  public long getScanTimeout(TimeUnit unit) {
+    return TimeUnit.NANOSECONDS.convert(scanTimeoutNs, unit);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/6ff19f94/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java
index d3efbda..e9cb476 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java
@@ -20,32 +20,90 @@ package org.apache.hadoop.hbase.client;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 
 /**
- * Interface for client-side scanning.
- * Go to {@link Table} to obtain instances.
+ * Interface for client-side scanning. Go to {@link Table} to obtain instances.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public interface ResultScanner extends Closeable, Iterable<Result> {
 
+  @Override
+  default Iterator<Result> iterator() {
+    return new Iterator<Result>() {
+      // The next RowResult, possibly pre-read
+      Result next = null;
+
+      // return true if there is another item pending, false if there isn't.
+      // this method is where the actual advancing takes place, but you need
+      // to call next() to consume it. hasNext() will only advance if there
+      // isn't a pending next().
+      @Override
+      public boolean hasNext() {
+        if (next != null) {
+          return true;
+        }
+        try {
+          return (next = ResultScanner.this.next()) != null;
+        } catch (IOException e) {
+          throw new UncheckedIOException(e);
+        }
+      }
+
+      // get the pending next item and advance the iterator. returns null if
+      // there is no next item.
+      @Override
+      public Result next() {
+        // since hasNext() does the real advancing, we call this to determine
+        // if there is a next before proceeding.
+        if (!hasNext()) {
+          return null;
+        }
+
+        // if we get to here, then hasNext() has given us an item to return.
+        // we want to return the item and then null out the next pointer, so
+        // we use a temporary variable.
+        Result temp = next;
+        next = null;
+        return temp;
+      }
+    };
+  }
+
   /**
    * Grab the next row's worth of values. The scanner will return a Result.
-   * @return Result object if there is another row, null if the scanner is
-   * exhausted.
+   * @return Result object if there is another row, null if the scanner is exhausted.
    * @throws IOException e
    */
   Result next() throws IOException;
 
   /**
+   * Get nbRows rows. How many RPCs are made is determined by the {@link Scan#setCaching(int)}
+   * setting (or hbase.client.scanner.caching in hbase-site.xml).
    * @param nbRows number of rows to return
-   * @return Between zero and nbRows results
-   * @throws IOException e
+   * @return Between zero and nbRows rowResults. Scan is done if returned array is of zero-length
+   *         (We never return null).
+   * @throws IOException
    */
-  Result [] next(int nbRows) throws IOException;
+  default Result[] next(int nbRows) throws IOException {
+    List<Result> resultSets = new ArrayList<>(nbRows);
+    for (int i = 0; i < nbRows; i++) {
+      Result next = next();
+      if (next != null) {
+        resultSets.add(next);
+      } else {
+        break;
+      }
+    }
+    return resultSets.toArray(new Result[0]);
+  }
 
   /**
    * Closes the scanner and releases any resources it has allocated

http://git-wip-us.apache.org/repos/asf/hbase/blob/6ff19f94/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
index a0792ef..220be10 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
@@ -57,11 +57,12 @@ public abstract class AbstractTestAsyncTableScan {
     TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys);
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
     ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
-    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+    RawAsyncTable table = ASYNC_CONN.getRawTable(TABLE_NAME);
     List<CompletableFuture<?>> futures = new ArrayList<>();
-    IntStream.range(0, COUNT).forEach(
-      i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i)))
-          .addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i)))));
+    IntStream.range(0, COUNT)
+        .forEach(i -> futures.add(table.put(
+          new Put(Bytes.toBytes(String.format("%03d", i))).addColumn(FAMILY, CQ1, Bytes.toBytes(i))
+              .addColumn(FAMILY, CQ2, Bytes.toBytes(i * i)))));
     CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).get();
   }
 
@@ -73,11 +74,11 @@ public abstract class AbstractTestAsyncTableScan {
 
   protected abstract Scan createScan();
 
-  protected abstract List<Result> doScan(AsyncTable table, Scan scan) throws Exception;
+  protected abstract List<Result> doScan(Scan scan) throws Exception;
 
   @Test
   public void testScanAll() throws Exception {
-    List<Result> results = doScan(ASYNC_CONN.getTable(TABLE_NAME), createScan());
+    List<Result> results = doScan(createScan());
     assertEquals(COUNT, results.size());
     IntStream.range(0, COUNT).forEach(i -> {
       Result result = results.get(i);
@@ -94,7 +95,7 @@ public abstract class AbstractTestAsyncTableScan {
 
   @Test
   public void testReversedScanAll() throws Exception {
-    List<Result> results = doScan(ASYNC_CONN.getTable(TABLE_NAME), createScan().setReversed(true));
+    List<Result> results = doScan(createScan().setReversed(true));
     assertEquals(COUNT, results.size());
     IntStream.range(0, COUNT).forEach(i -> assertResultEquals(results.get(i), COUNT - i - 1));
   }
@@ -102,8 +103,8 @@ public abstract class AbstractTestAsyncTableScan {
   @Test
   public void testScanNoStopKey() throws Exception {
     int start = 345;
-    List<Result> results = doScan(ASYNC_CONN.getTable(TABLE_NAME),
-      createScan().setStartRow(Bytes.toBytes(String.format("%03d", start))));
+    List<Result> results =
+        doScan(createScan().setStartRow(Bytes.toBytes(String.format("%03d", start))));
     assertEquals(COUNT - start, results.size());
     IntStream.range(0, COUNT - start).forEach(i -> assertResultEquals(results.get(i), start + i));
   }
@@ -111,24 +112,24 @@ public abstract class AbstractTestAsyncTableScan {
   @Test
   public void testReverseScanNoStopKey() throws Exception {
     int start = 765;
-    List<Result> results = doScan(ASYNC_CONN.getTable(TABLE_NAME),
+    List<Result> results = doScan(
       createScan().setStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true));
     assertEquals(start + 1, results.size());
     IntStream.range(0, start + 1).forEach(i -> assertResultEquals(results.get(i), start - i));
   }
 
   private void testScan(int start, int stop) throws Exception {
-    List<Result> results = doScan(ASYNC_CONN.getTable(TABLE_NAME),
-      createScan().setStartRow(Bytes.toBytes(String.format("%03d", start)))
-          .setStopRow(Bytes.toBytes(String.format("%03d", stop))));
+    List<Result> results =
+        doScan(createScan().setStartRow(Bytes.toBytes(String.format("%03d", start)))
+            .setStopRow(Bytes.toBytes(String.format("%03d", stop))));
     assertEquals(stop - start, results.size());
     IntStream.range(0, stop - start).forEach(i -> assertResultEquals(results.get(i), start + i));
   }
 
   private void testReversedScan(int start, int stop) throws Exception {
-    List<Result> results = doScan(ASYNC_CONN.getTable(TABLE_NAME),
-      createScan().setStartRow(Bytes.toBytes(String.format("%03d", start)))
-          .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setReversed(true));
+    List<Result> results =
+        doScan(createScan().setStartRow(Bytes.toBytes(String.format("%03d", start)))
+            .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setReversed(true));
     assertEquals(start - stop, results.size());
     IntStream.range(0, start - stop).forEach(i -> assertResultEquals(results.get(i), start - i));
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6ff19f94/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java
index b20e616..fe988aa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java
@@ -87,7 +87,7 @@ public class TestAsyncGetMultiThread {
     TEST_UTIL.createTable(TABLE_NAME, FAMILY);
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
     CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
-    AsyncTable table = CONN.getTable(TABLE_NAME);
+    RawAsyncTable table = CONN.getRawTable(TABLE_NAME);
     List<CompletableFuture<?>> futures = new ArrayList<>();
     IntStream.range(0, COUNT)
         .forEach(i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i)))
@@ -105,8 +105,9 @@ public class TestAsyncGetMultiThread {
     while (!stop.get()) {
       int i = ThreadLocalRandom.current().nextInt(COUNT);
       assertEquals(i,
-        Bytes.toInt(CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i))))
-            .get().getValue(FAMILY, QUALIFIER)));
+        Bytes.toInt(
+          CONN.getRawTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i)))).get()
+              .getValue(FAMILY, QUALIFIER)));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/6ff19f94/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
index 67d2661..0b3e186 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
@@ -99,7 +99,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
     int index = TEST_UTIL.getHBaseCluster().getServerWith(loc.getRegionInfo().getRegionName());
     TEST_UTIL.getAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(), Bytes.toBytes(
       TEST_UTIL.getHBaseCluster().getRegionServer(1 - index).getServerName().getServerName()));
-    AsyncTable table = asyncConn.getTable(TABLE_NAME);
+    RawAsyncTable table = asyncConn.getRawTable(TABLE_NAME);
     table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
 
     // move back
@@ -185,7 +185,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
             return mockedLocator;
           }
         }) {
-      AsyncTable table = new AsyncTableImpl(mockedConn, TABLE_NAME);
+      RawAsyncTable table = new RawAsyncTableImpl(mockedConn, TABLE_NAME);
       table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
       assertTrue(errorTriggered.get());
       errorTriggered.set(false);

http://git-wip-us.apache.org/repos/asf/hbase/blob/6ff19f94/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
index 8ba3414..7a85727 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
@@ -26,12 +26,15 @@ import static org.junit.Assert.assertTrue;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 import java.util.stream.IntStream;
 
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -47,7 +50,12 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
 
+@RunWith(Parameterized.class)
 @Category({ MediumTests.class, ClientTests.class })
 public class TestAsyncTable {
 
@@ -68,6 +76,23 @@ public class TestAsyncTable {
 
   private byte[] row;
 
+  @Parameter
+  public Supplier<AsyncTableBase> getTable;
+
+  private static RawAsyncTable getRawTable() {
+    return ASYNC_CONN.getRawTable(TABLE_NAME);
+  }
+
+  private static AsyncTable getTable() {
+    return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
+  }
+
+  @Parameters
+  public static List<Object[]> params() {
+    return Arrays.asList(new Supplier<?>[] { TestAsyncTable::getRawTable },
+      new Supplier<?>[] { TestAsyncTable::getTable });
+  }
+
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     TEST_UTIL.startMiniCluster(1);
@@ -89,7 +114,7 @@ public class TestAsyncTable {
 
   @Test
   public void testSimple() throws Exception {
-    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+    AsyncTableBase table = getTable.get();
     table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).get();
     assertTrue(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get());
     Result result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get();
@@ -106,7 +131,7 @@ public class TestAsyncTable {
 
   @Test
   public void testSimpleMultiple() throws Exception {
-    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+    AsyncTableBase table = getTable.get();
     int count = 100;
     CountDownLatch putLatch = new CountDownLatch(count);
     IntStream.range(0, count).forEach(
@@ -150,7 +175,7 @@ public class TestAsyncTable {
 
   @Test
   public void testIncrement() throws InterruptedException, ExecutionException {
-    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+    AsyncTableBase table = getTable.get();
     int count = 100;
     CountDownLatch latch = new CountDownLatch(count);
     AtomicLong sum = new AtomicLong(0L);
@@ -167,7 +192,7 @@ public class TestAsyncTable {
 
   @Test
   public void testAppend() throws InterruptedException, ExecutionException {
-    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+    AsyncTableBase table = getTable.get();
     int count = 10;
     CountDownLatch latch = new CountDownLatch(count);
     char suffix = ':';
@@ -190,7 +215,7 @@ public class TestAsyncTable {
 
   @Test
   public void testCheckAndPut() throws InterruptedException, ExecutionException {
-    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+    AsyncTableBase table = getTable.get();
     AtomicInteger successCount = new AtomicInteger(0);
     AtomicInteger successIndex = new AtomicInteger(-1);
     int count = 10;
@@ -211,7 +236,7 @@ public class TestAsyncTable {
 
   @Test
   public void testCheckAndDelete() throws InterruptedException, ExecutionException {
-    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+    AsyncTableBase table = getTable.get();
     int count = 10;
     CountDownLatch putLatch = new CountDownLatch(count + 1);
     table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
@@ -223,17 +248,16 @@ public class TestAsyncTable {
     AtomicInteger successCount = new AtomicInteger(0);
     AtomicInteger successIndex = new AtomicInteger(-1);
     CountDownLatch deleteLatch = new CountDownLatch(count);
-    IntStream.range(0, count)
-        .forEach(i -> table
-            .checkAndDelete(row, FAMILY, QUALIFIER, VALUE,
-              new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i)))
-            .thenAccept(x -> {
-              if (x) {
-                successCount.incrementAndGet();
-                successIndex.set(i);
-              }
-              deleteLatch.countDown();
-            }));
+    IntStream.range(0, count).forEach(i -> table
+        .checkAndDelete(row, FAMILY, QUALIFIER, VALUE,
+          new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i)))
+        .thenAccept(x -> {
+          if (x) {
+            successCount.incrementAndGet();
+            successIndex.set(i);
+          }
+          deleteLatch.countDown();
+        }));
     deleteLatch.await();
     assertEquals(1, successCount.get());
     Result result = table.get(new Get(row)).get();
@@ -248,7 +272,7 @@ public class TestAsyncTable {
 
   @Test
   public void testMutateRow() throws InterruptedException, ExecutionException, IOException {
-    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+    AsyncTableBase table = getTable.get();
     RowMutations mutation = new RowMutations(row);
     mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE));
     table.mutateRow(mutation).get();
@@ -266,7 +290,7 @@ public class TestAsyncTable {
 
   @Test
   public void testCheckAndMutate() throws InterruptedException, ExecutionException {
-    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+    AsyncTableBase table = getTable.get();
     int count = 10;
     CountDownLatch putLatch = new CountDownLatch(count + 1);
     table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());

http://git-wip-us.apache.org/repos/asf/hbase/blob/6ff19f94/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
index 840f844..c8e1c7a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
@@ -100,7 +100,7 @@ public class TestAsyncTableNoncedRetry {
 
   @Test
   public void testAppend() throws InterruptedException, ExecutionException {
-    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+    RawAsyncTable table = ASYNC_CONN.getRawTable(TABLE_NAME);
     Result result = table.append(new Append(row).add(FAMILY, QUALIFIER, VALUE)).get();
     assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
     result = table.append(new Append(row).add(FAMILY, QUALIFIER, VALUE)).get();
@@ -112,7 +112,7 @@ public class TestAsyncTableNoncedRetry {
 
   @Test
   public void testIncrement() throws InterruptedException, ExecutionException {
-    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+    RawAsyncTable table = ASYNC_CONN.getRawTable(TABLE_NAME);
     assertEquals(1L, table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get().longValue());
     // the second call should have no effect as we always generate the same nonce.
     assertEquals(1L, table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get().longValue());

http://git-wip-us.apache.org/repos/asf/hbase/blob/6ff19f94/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java
index d21560f..65fb1ad 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java
@@ -124,11 +124,11 @@ public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
   }
 
   @Override
-  protected List<Result> doScan(AsyncTable table, Scan scan) throws Exception {
-    SimpleScanResultConsumer scanObserver = new SimpleScanResultConsumer();
-    table.scan(scan, scanObserver);
+  protected List<Result> doScan(Scan scan) throws Exception {
+    SimpleScanResultConsumer scanConsumer = new SimpleScanResultConsumer();
+    ASYNC_CONN.getRawTable(TABLE_NAME).scan(scan, scanConsumer);
     List<Result> results = new ArrayList<>();
-    for (Result result; (result = scanObserver.take()) != null;) {
+    for (Result result; (result = scanConsumer.take()) != null;) {
       results.add(result);
     }
     if (scan.getBatch() > 0) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/6ff19f94/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java
index e920013..3737af2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java
@@ -19,8 +19,11 @@ package org.apache.hadoop.hbase.client;
 
 import static org.junit.Assert.assertEquals;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.function.Supplier;
 import java.util.stream.IntStream;
 
 import org.apache.hadoop.hbase.testclassification.ClientTests;
@@ -28,19 +31,44 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
 
+@RunWith(Parameterized.class)
 @Category({ MediumTests.class, ClientTests.class })
 public class TestAsyncTableSmallScan extends AbstractTestAsyncTableScan {
 
+  @Parameter
+  public Supplier<AsyncTableBase> getTable;
+
+  private static RawAsyncTable getRawTable() {
+    return ASYNC_CONN.getRawTable(TABLE_NAME);
+  }
+
+  private static AsyncTable getTable() {
+    return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
+  }
+
+  @Parameters
+  public static List<Object[]> params() {
+    return Arrays.asList(new Supplier<?>[] { TestAsyncTableSmallScan::getRawTable },
+      new Supplier<?>[] { TestAsyncTableSmallScan::getTable });
+  }
+
   @Test
   public void testScanWithLimit() throws InterruptedException, ExecutionException {
-    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+    AsyncTableBase table = getTable.get();
     int start = 111;
     int stop = 888;
     int limit = 300;
-    List<Result> results = table.smallScan(new Scan(Bytes.toBytes(String.format("%03d", start)))
-        .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true),
-      limit).get();
+    List<Result> results =
+        table
+            .smallScan(new Scan(Bytes.toBytes(String.format("%03d", start)))
+                .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true),
+              limit)
+            .get();
     assertEquals(limit, results.size());
     IntStream.range(0, limit).forEach(i -> {
       Result result = results.get(i);
@@ -52,7 +80,7 @@ public class TestAsyncTableSmallScan extends AbstractTestAsyncTableScan {
 
   @Test
   public void testReversedScanWithLimit() throws InterruptedException, ExecutionException {
-    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+    AsyncTableBase table = getTable.get();
     int start = 888;
     int stop = 111;
     int limit = 300;
@@ -75,7 +103,7 @@ public class TestAsyncTableSmallScan extends AbstractTestAsyncTableScan {
   }
 
   @Override
-  protected List<Result> doScan(AsyncTable table, Scan scan) throws Exception {
-    return table.smallScan(scan).get();
+  protected List<Result> doScan(Scan scan) throws Exception {
+    return getTable.get().smallScan(scan).get();
   }
 }


[2/2] hbase git commit: HBASE-17141 Introduce a more user-friendly implementation of AsyncTable

Posted by zh...@apache.org.
HBASE-17141 Introduce a more user-friendly implementation of AsyncTable


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6ff19f94
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6ff19f94
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6ff19f94

Branch: refs/heads/master
Commit: 6ff19f94fe70f5815214c0d228c748a56468e8b9
Parents: b297f2d
Author: zhangduo <zh...@apache.org>
Authored: Tue Nov 22 11:56:28 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Tue Nov 22 14:44:51 2016 +0800

----------------------------------------------------------------------
 .../hbase/client/AbstractClientScanner.java     |  76 ----
 .../client/AllowPartialScanResultCache.java     |  26 +-
 .../hadoop/hbase/client/AsyncConnection.java    |  16 +-
 .../hbase/client/AsyncConnectionImpl.java       |  10 +-
 .../hadoop/hbase/client/AsyncRegionLocator.java |   4 +-
 .../apache/hadoop/hbase/client/AsyncTable.java  | 352 +---------------
 .../hadoop/hbase/client/AsyncTableBase.java     | 354 ++++++++++++++++
 .../hadoop/hbase/client/AsyncTableImpl.java     | 357 +++-------------
 .../client/ClientAsyncPrefetchScanner.java      |  12 +-
 .../hadoop/hbase/client/ClientScanner.java      |  11 +-
 .../hadoop/hbase/client/ConnectionUtils.java    |  34 ++
 .../hadoop/hbase/client/RawAsyncTable.java      |  61 +++
 .../hadoop/hbase/client/RawAsyncTableImpl.java  | 408 +++++++++++++++++++
 .../hadoop/hbase/client/ResultScanner.java      |  72 +++-
 .../client/AbstractTestAsyncTableScan.java      |  33 +-
 .../hbase/client/TestAsyncGetMultiThread.java   |   7 +-
 ...TestAsyncSingleRequestRpcRetryingCaller.java |   4 +-
 .../hadoop/hbase/client/TestAsyncTable.java     |  62 ++-
 .../hbase/client/TestAsyncTableNoncedRetry.java |   4 +-
 .../hadoop/hbase/client/TestAsyncTableScan.java |   8 +-
 .../hbase/client/TestAsyncTableSmallScan.java   |  42 +-
 21 files changed, 1122 insertions(+), 831 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6ff19f94/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
index 7658faf..87304c3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
@@ -17,10 +17,6 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 
@@ -52,76 +48,4 @@ public abstract class AbstractClientScanner implements ResultScanner {
   public ScanMetrics getScanMetrics() {
     return scanMetrics;
   }
-
-  /**
-   * Get nbRows rows.
-   * How many RPCs are made is determined by the {@link Scan#setCaching(int)}
-   * setting (or hbase.client.scanner.caching in hbase-site.xml).
-   * @param nbRows number of rows to return
-   * @return Between zero and nbRows rowResults.  Scan is done
-   * if returned array is of zero-length (We never return null).
-   * @throws IOException
-   */
-  @Override
-  public Result [] next(int nbRows) throws IOException {
-    // Collect values to be returned here
-    ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
-    for(int i = 0; i < nbRows; i++) {
-      Result next = next();
-      if (next != null) {
-        resultSets.add(next);
-      } else {
-        break;
-      }
-    }
-    return resultSets.toArray(new Result[resultSets.size()]);
-  }
-
-  @Override
-  public Iterator<Result> iterator() {
-    return new Iterator<Result>() {
-      // The next RowResult, possibly pre-read
-      Result next = null;
-
-      // return true if there is another item pending, false if there isn't.
-      // this method is where the actual advancing takes place, but you need
-      // to call next() to consume it. hasNext() will only advance if there
-      // isn't a pending next().
-      @Override
-      public boolean hasNext() {
-        if (next == null) {
-          try {
-            next = AbstractClientScanner.this.next();
-            return next != null;
-          } catch (IOException e) {
-            throw new RuntimeException(e);
-          }
-        }
-        return true;
-      }
-
-      // get the pending next item and advance the iterator. returns null if
-      // there is no next item.
-      @Override
-      public Result next() {
-        // since hasNext() does the real advancing, we call this to determine
-        // if there is a next before proceeding.
-        if (!hasNext()) {
-          return null;
-        }
-
-        // if we get to here, then hasNext() has given us an item to return.
-        // we want to return the item and then null out the next pointer, so
-        // we use a temporary variable.
-        Result temp = next;
-        next = null;
-        return temp;
-      }
-
-      @Override
-      public void remove() {
-        throw new UnsupportedOperationException();
-      }
-    };
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6ff19f94/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
index bc6e44e..ab26587 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
@@ -21,8 +21,6 @@ import java.io.IOException;
 import java.util.Arrays;
 
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
 /**
@@ -39,29 +37,7 @@ class AllowPartialScanResultCache implements ScanResultCache {
   private Cell lastCell;
 
   private Result filterCells(Result result) {
-    if (lastCell == null) {
-      return result;
-    }
-
-    // not the same row
-    if (!CellUtil.matchingRow(lastCell, result.getRow(), 0, result.getRow().length)) {
-      return result;
-    }
-    Cell[] rawCells = result.rawCells();
-    int index = Arrays.binarySearch(rawCells, lastCell, CellComparator::compareWithoutRow);
-    if (index < 0) {
-      index = -index - 1;
-    } else {
-      index++;
-    }
-    if (index == 0) {
-      return result;
-    }
-    if (index == rawCells.length) {
-      return null;
-    }
-    return Result.create(Arrays.copyOfRange(rawCells, index, rawCells.length), null,
-      result.isStale(), true);
+    return lastCell == null ? result : ConnectionUtils.filterCells(result, lastCell);
   }
 
   private void updateLastCell(Result result) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/6ff19f94/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
index 6dc0300..7b0f339 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.client;
 
 import java.io.Closeable;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
@@ -49,6 +50,18 @@ public interface AsyncConnection extends Closeable {
   AsyncTableRegionLocator getRegionLocator(TableName tableName);
 
   /**
+   * Retrieve an RawAsyncTable implementation for accessing a table. The returned Table is not
+   * thread safe, a new instance should be created for each using thread. This is a lightweight
+   * operation, pooling or caching of the returned AsyncTable is neither required nor desired.
+   * <p>
+   * This method no longer checks table existence. An exception will be thrown if the table does not
+   * exist only when the first operation is attempted.
+   * @param tableName the name of the table
+   * @return an RawAsyncTable to use for interactions with this table
+   */
+  RawAsyncTable getRawTable(TableName tableName);
+
+  /**
    * Retrieve an AsyncTable implementation for accessing a table. The returned Table is not thread
    * safe, a new instance should be created for each using thread. This is a lightweight operation,
    * pooling or caching of the returned AsyncTable is neither required nor desired.
@@ -56,7 +69,8 @@ public interface AsyncConnection extends Closeable {
    * This method no longer checks table existence. An exception will be thrown if the table does not
    * exist only when the first operation is attempted.
    * @param tableName the name of the table
+   * @param pool the thread pool to use for executing callback
    * @return an AsyncTable to use for interactions with this table
    */
-  AsyncTable getTable(TableName tableName);
+  AsyncTable getTable(TableName tableName, ExecutorService pool);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/6ff19f94/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 70e024e..5c32a9f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -30,6 +30,7 @@ import java.io.IOException;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.IOUtils;
@@ -148,7 +149,12 @@ class AsyncConnectionImpl implements AsyncConnection {
   }
 
   @Override
-  public AsyncTable getTable(TableName tableName) {
-    return new AsyncTableImpl(this, tableName);
+  public RawAsyncTable getRawTable(TableName tableName) {
+    return new RawAsyncTableImpl(this, tableName);
+  }
+
+  @Override
+  public AsyncTable getTable(TableName tableName, ExecutorService pool) {
+    return new AsyncTableImpl(this, tableName, pool);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/6ff19f94/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
index ba5a0e0..6b74e4c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
@@ -262,7 +262,7 @@ class AsyncRegionLocator {
     }
     CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
     byte[] metaKey = createRegionName(tableName, row, NINES, false);
-    conn.getTable(META_TABLE_NAME)
+    conn.getRawTable(META_TABLE_NAME)
         .smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1)
         .whenComplete(
           (results, error) -> onScanComplete(future, tableName, row, results, error, "row", loc -> {
@@ -327,7 +327,7 @@ class AsyncRegionLocator {
       metaKey = createRegionName(tableName, startRowOfCurrentRegion, ZEROES, false);
     }
     CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
-    conn.getTable(META_TABLE_NAME)
+    conn.getRawTable(META_TABLE_NAME)
         .smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1)
         .whenComplete((results, error) -> onScanComplete(future, tableName, startRowOfCurrentRegion,
           results, error, "startRowOfCurrentRegion", loc -> {

http://git-wip-us.apache.org/repos/asf/hbase/blob/6ff19f94/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
index 851fbf1..e082d10 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
@@ -17,359 +17,17 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import com.google.common.base.Preconditions;
-
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
 
 /**
- * The asynchronous version of Table. Obtain an instance from a {@link AsyncConnection}.
+ * The asynchronous table for normal users.
  * <p>
- * The implementation is NOT required to be thread safe. Do NOT access it from multiple threads
- * concurrently.
- * <p>
- * Usually the implementations will not throw any exception directly, you need to get the exception
- * from the returned {@link CompletableFuture}.
+ * The implementation should make sure that user can do everything they want to the returned
+ * {@code CompletableFuture} without break anything. Usually the implementation will require user to
+ * provide a {@code ExecutorService}.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
-public interface AsyncTable {
-
-  /**
-   * Gets the fully qualified table name instance of this table.
-   */
-  TableName getName();
-
-  /**
-   * Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance.
-   * <p>
-   * The reference returned is not a copy, so any change made to it will affect this instance.
-   */
-  Configuration getConfiguration();
-
-  /**
-   * Set timeout of each rpc read request in operations of this Table instance, will override the
-   * value of {@code hbase.rpc.read.timeout} in configuration. If a rpc read request waiting too
-   * long, it will stop waiting and send a new request to retry until retries exhausted or operation
-   * timeout reached.
-   */
-  void setReadRpcTimeout(long timeout, TimeUnit unit);
-
-  /**
-   * Get timeout of each rpc read request in this Table instance.
-   */
-  long getReadRpcTimeout(TimeUnit unit);
-
-  /**
-   * Set timeout of each rpc write request in operations of this Table instance, will override the
-   * value of {@code hbase.rpc.write.timeout} in configuration. If a rpc write request waiting too
-   * long, it will stop waiting and send a new request to retry until retries exhausted or operation
-   * timeout reached.
-   */
-  void setWriteRpcTimeout(long timeout, TimeUnit unit);
-
-  /**
-   * Get timeout of each rpc write request in this Table instance.
-   */
-  long getWriteRpcTimeout(TimeUnit unit);
-
-  /**
-   * Set timeout of each operation in this Table instance, will override the value of
-   * {@code hbase.client.operation.timeout} in configuration.
-   * <p>
-   * Operation timeout is a top-level restriction that makes sure an operation will not be blocked
-   * more than this. In each operation, if rpc request fails because of timeout or other reason, it
-   * will retry until success or throw a RetriesExhaustedException. But if the total time elapsed
-   * reach the operation timeout before retries exhausted, it will break early and throw
-   * SocketTimeoutException.
-   */
-  void setOperationTimeout(long timeout, TimeUnit unit);
-
-  /**
-   * Get timeout of each operation in Table instance.
-   */
-  long getOperationTimeout(TimeUnit unit);
-
-  /**
-   * Set timeout of a single operation in a scan, such as openScanner and next. Will override the
-   * value {@code hbase.client.scanner.timeout.period} in configuration.
-   * <p>
-   * Generally a scan will never timeout after we add heartbeat support unless the region is
-   * crashed. The {@code scanTimeout} works like the {@code operationTimeout} for each single
-   * operation in a scan.
-   */
-  void setScanTimeout(long timeout, TimeUnit unit);
-
-  /**
-   * Get the timeout of a single operation in a scan.
-   */
-  long getScanTimeout(TimeUnit unit);
-
-  /**
-   * Test for the existence of columns in the table, as specified by the Get.
-   * <p>
-   * This will return true if the Get matches one or more keys, false if not.
-   * <p>
-   * This is a server-side call so it prevents any data from being transfered to the client.
-   * @return true if the specified Get matches one or more keys, false if not. The return value will
-   *         be wrapped by a {@link CompletableFuture}.
-   */
-  default CompletableFuture<Boolean> exists(Get get) {
-    if (!get.isCheckExistenceOnly()) {
-      get = ReflectionUtils.newInstance(get.getClass(), get);
-      get.setCheckExistenceOnly(true);
-    }
-    return get(get).thenApply(r -> r.getExists());
-  }
-
-  /**
-   * Extracts certain cells from a given row.
-   * @param get The object that specifies what data to fetch and from which row.
-   * @return The data coming from the specified row, if it exists. If the row specified doesn't
-   *         exist, the {@link Result} instance returned won't contain any
-   *         {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. The
-   *         return value will be wrapped by a {@link CompletableFuture}.
-   */
-  CompletableFuture<Result> get(Get get);
-
-  /**
-   * Puts some data to the table.
-   * @param put The data to put.
-   * @return A {@link CompletableFuture} that always returns null when complete normally.
-   */
-  CompletableFuture<Void> put(Put put);
-
-  /**
-   * Deletes the specified cells/row.
-   * @param delete The object that specifies what to delete.
-   * @return A {@link CompletableFuture} that always returns null when complete normally.
-   */
-  CompletableFuture<Void> delete(Delete delete);
-
-  /**
-   * Appends values to one or more columns within a single row.
-   * <p>
-   * This operation does not appear atomic to readers. Appends are done under a single row lock, so
-   * write operations to a row are synchronized, but readers do not take row locks so get and scan
-   * operations can see this operation partially completed.
-   * @param append object that specifies the columns and amounts to be used for the increment
-   *          operations
-   * @return values of columns after the append operation (maybe null). The return value will be
-   *         wrapped by a {@link CompletableFuture}.
-   */
-  CompletableFuture<Result> append(Append append);
-
-  /**
-   * Increments one or more columns within a single row.
-   * <p>
-   * This operation does not appear atomic to readers. Increments are done under a single row lock,
-   * so write operations to a row are synchronized, but readers do not take row locks so get and
-   * scan operations can see this operation partially completed.
-   * @param increment object that specifies the columns and amounts to be used for the increment
-   *          operations
-   * @return values of columns after the increment. The return value will be wrapped by a
-   *         {@link CompletableFuture}.
-   */
-  CompletableFuture<Result> increment(Increment increment);
-
-  /**
-   * See {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)}
-   * <p>
-   * The {@link Durability} is defaulted to {@link Durability#SYNC_WAL}.
-   * @param row The row that contains the cell to increment.
-   * @param family The column family of the cell to increment.
-   * @param qualifier The column qualifier of the cell to increment.
-   * @param amount The amount to increment the cell with (or decrement, if the amount is negative).
-   * @return The new value, post increment. The return value will be wrapped by a
-   *         {@link CompletableFuture}.
-   */
-  default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
-      long amount) {
-    return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
-  }
-
-  /**
-   * Atomically increments a column value. If the column value already exists and is not a
-   * big-endian long, this could throw an exception. If the column value does not yet exist it is
-   * initialized to <code>amount</code> and written to the specified column.
-   * <p>
-   * Setting durability to {@link Durability#SKIP_WAL} means that in a fail scenario you will lose
-   * any increments that have not been flushed.
-   * @param row The row that contains the cell to increment.
-   * @param family The column family of the cell to increment.
-   * @param qualifier The column qualifier of the cell to increment.
-   * @param amount The amount to increment the cell with (or decrement, if the amount is negative).
-   * @param durability The persistence guarantee for this increment.
-   * @return The new value, post increment. The return value will be wrapped by a
-   *         {@link CompletableFuture}.
-   */
-  default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
-      long amount, Durability durability) {
-    Preconditions.checkNotNull(row, "row is null");
-    Preconditions.checkNotNull(family, "family is null");
-    Preconditions.checkNotNull(qualifier, "qualifier is null");
-    return increment(
-      new Increment(row).addColumn(family, qualifier, amount).setDurability(durability))
-          .thenApply(r -> Bytes.toLong(r.getValue(family, qualifier)));
-  }
-
-  /**
-   * Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it
-   * adds the put. If the passed value is null, the check is for the lack of column (ie:
-   * non-existence)
-   * @param row to check
-   * @param family column family to check
-   * @param qualifier column qualifier to check
-   * @param value the expected value
-   * @param put data to put if check succeeds
-   * @return true if the new put was executed, false otherwise. The return value will be wrapped by
-   *         a {@link CompletableFuture}.
-   */
-  default CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier,
-      byte[] value, Put put) {
-    return checkAndPut(row, family, qualifier, CompareOp.EQUAL, value, put);
-  }
-
-  /**
-   * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
-   * adds the put. If the passed value is null, the check is for the lack of column (ie:
-   * non-existence)
-   * @param row to check
-   * @param family column family to check
-   * @param qualifier column qualifier to check
-   * @param compareOp comparison operator to use
-   * @param value the expected value
-   * @param put data to put if check succeeds
-   * @return true if the new put was executed, false otherwise. The return value will be wrapped by
-   *         a {@link CompletableFuture}.
-   */
-  CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier,
-      CompareOp compareOp, byte[] value, Put put);
-
-  /**
-   * Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it
-   * adds the delete. If the passed value is null, the check is for the lack of column (ie:
-   * non-existence)
-   * @param row to check
-   * @param family column family to check
-   * @param qualifier column qualifier to check
-   * @param value the expected value
-   * @param delete data to delete if check succeeds
-   * @return true if the new delete was executed, false otherwise. The return value will be wrapped
-   *         by a {@link CompletableFuture}.
-   */
-  default CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
-      byte[] value, Delete delete) {
-    return checkAndDelete(row, family, qualifier, CompareOp.EQUAL, value, delete);
-  }
-
-  /**
-   * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
-   * adds the delete. If the passed value is null, the check is for the lack of column (ie:
-   * non-existence)
-   * @param row to check
-   * @param family column family to check
-   * @param qualifier column qualifier to check
-   * @param compareOp comparison operator to use
-   * @param value the expected value
-   * @param delete data to delete if check succeeds
-   * @return true if the new delete was executed, false otherwise. The return value will be wrapped
-   *         by a {@link CompletableFuture}.
-   */
-  CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
-      CompareOp compareOp, byte[] value, Delete delete);
-
-  /**
-   * Performs multiple mutations atomically on a single row. Currently {@link Put} and
-   * {@link Delete} are supported.
-   * @param mutation object that specifies the set of mutations to perform atomically
-   * @return A {@link CompletableFuture} that always returns null when complete normally.
-   */
-  CompletableFuture<Void> mutateRow(RowMutations mutation);
-
-  /**
-   * Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it
-   * performs the row mutations. If the passed value is null, the check is for the lack of column
-   * (ie: non-existence)
-   * @param row to check
-   * @param family column family to check
-   * @param qualifier column qualifier to check
-   * @param value the expected value
-   * @param mutation mutations to perform if check succeeds
-   * @return true if the new put was executed, false otherwise. The return value will be wrapped by
-   *         a {@link CompletableFuture}.
-   */
-  default CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
-      byte[] value, RowMutations mutation) {
-    return checkAndMutate(row, family, qualifier, CompareOp.EQUAL, value, mutation);
-  }
-
-  /**
-   * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
-   * performs the row mutations. If the passed value is null, the check is for the lack of column
-   * (ie: non-existence)
-   * @param row to check
-   * @param family column family to check
-   * @param qualifier column qualifier to check
-   * @param compareOp the comparison operator
-   * @param value the expected value
-   * @param mutation mutations to perform if check succeeds
-   * @return true if the new put was executed, false otherwise. The return value will be wrapped by
-   *         a {@link CompletableFuture}.
-   */
-  CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
-      CompareOp compareOp, byte[] value, RowMutations mutation);
-
-  /**
-   * Just call {@link #smallScan(Scan, int)} with {@link Integer#MAX_VALUE}.
-   * @see #smallScan(Scan, int)
-   */
-  default CompletableFuture<List<Result>> smallScan(Scan scan) {
-    return smallScan(scan, Integer.MAX_VALUE);
-  }
-
-  /**
-   * Return all the results that match the given scan object. The number of the returned results
-   * will not be greater than {@code limit}.
-   * <p>
-   * Notice that the scan must be small, and should not use batch or allowPartialResults. The
-   * {@code caching} property of the scan object is also ignored as we will use {@code limit}
-   * instead.
-   * @param scan A configured {@link Scan} object.
-   * @param limit the limit of results count
-   * @return The results of this small scan operation. The return value will be wrapped by a
-   *         {@link CompletableFuture}.
-   */
-  CompletableFuture<List<Result>> smallScan(Scan scan, int limit);
-
-  /**
-   * The basic scan API uses the observer pattern. All results that match the given scan object will
-   * be passed to the given {@code consumer} by calling {@link ScanResultConsumer#onNext(Result[])}.
-   * {@link ScanResultConsumer#onComplete()} means the scan is finished, and
-   * {@link ScanResultConsumer#onError(Throwable)} means we hit an unrecoverable error and the scan
-   * is terminated. {@link ScanResultConsumer#onHeartbeat()} means the RS is still working but we
-   * can not get a valid result to call {@link ScanResultConsumer#onNext(Result[])}. This is usually
-   * because the matched results are too sparse, for example, a filter which almost filters out
-   * everything is specified.
-   * <p>
-   * Notice that, the methods of the given {@code consumer} will be called directly in the rpc
-   * framework's callback thread, so typically you should not do any time consuming work inside
-   * these methods, otherwise you will be likely to block at least one connection to RS(even more if
-   * the rpc framework uses NIO).
-   * <p>
-   * This method is only for experts, do <strong>NOT</strong> use this method if you have other
-   * choice.
-   * @param scan A configured {@link Scan} object.
-   * @param consumer the consumer used to receive results.
-   */
-  void scan(Scan scan, ScanResultConsumer consumer);
+public interface AsyncTable extends AsyncTableBase {
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6ff19f94/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
new file mode 100644
index 0000000..e051a6b
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.common.base.Preconditions;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+
+/**
+ * The base interface for asynchronous version of Table. Obtain an instance from a
+ * {@link AsyncConnection}.
+ * <p>
+ * The implementation is NOT required to be thread safe. Do NOT access it from multiple threads
+ * concurrently.
+ * <p>
+ * Usually the implementations will not throw any exception directly, you need to get the exception
+ * from the returned {@link CompletableFuture}.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface AsyncTableBase {
+
+  /**
+   * Gets the fully qualified table name instance of this table.
+   */
+  TableName getName();
+
+  /**
+   * Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance.
+   * <p>
+   * The reference returned is not a copy, so any change made to it will affect this instance.
+   */
+  Configuration getConfiguration();
+
+  /**
+   * Set timeout of each rpc read request in operations of this Table instance, will override the
+   * value of {@code hbase.rpc.read.timeout} in configuration. If a rpc read request waiting too
+   * long, it will stop waiting and send a new request to retry until retries exhausted or operation
+   * timeout reached.
+   */
+  void setReadRpcTimeout(long timeout, TimeUnit unit);
+
+  /**
+   * Get timeout of each rpc read request in this Table instance.
+   */
+  long getReadRpcTimeout(TimeUnit unit);
+
+  /**
+   * Set timeout of each rpc write request in operations of this Table instance, will override the
+   * value of {@code hbase.rpc.write.timeout} in configuration. If a rpc write request waiting too
+   * long, it will stop waiting and send a new request to retry until retries exhausted or operation
+   * timeout reached.
+   */
+  void setWriteRpcTimeout(long timeout, TimeUnit unit);
+
+  /**
+   * Get timeout of each rpc write request in this Table instance.
+   */
+  long getWriteRpcTimeout(TimeUnit unit);
+
+  /**
+   * Set timeout of each operation in this Table instance, will override the value of
+   * {@code hbase.client.operation.timeout} in configuration.
+   * <p>
+   * Operation timeout is a top-level restriction that makes sure an operation will not be blocked
+   * more than this. In each operation, if rpc request fails because of timeout or other reason, it
+   * will retry until success or throw a RetriesExhaustedException. But if the total time elapsed
+   * reach the operation timeout before retries exhausted, it will break early and throw
+   * SocketTimeoutException.
+   */
+  void setOperationTimeout(long timeout, TimeUnit unit);
+
+  /**
+   * Get timeout of each operation in Table instance.
+   */
+  long getOperationTimeout(TimeUnit unit);
+
+  /**
+   * Set timeout of a single operation in a scan, such as openScanner and next. Will override the
+   * value {@code hbase.client.scanner.timeout.period} in configuration.
+   * <p>
+   * Generally a scan will never timeout after we add heartbeat support unless the region is
+   * crashed. The {@code scanTimeout} works like the {@code operationTimeout} for each single
+   * operation in a scan.
+   */
+  void setScanTimeout(long timeout, TimeUnit unit);
+
+  /**
+   * Get the timeout of a single operation in a scan.
+   */
+  long getScanTimeout(TimeUnit unit);
+
+  /**
+   * Test for the existence of columns in the table, as specified by the Get.
+   * <p>
+   * This will return true if the Get matches one or more keys, false if not.
+   * <p>
+   * This is a server-side call so it prevents any data from being transfered to the client.
+   * @return true if the specified Get matches one or more keys, false if not. The return value will
+   *         be wrapped by a {@link CompletableFuture}.
+   */
+  default CompletableFuture<Boolean> exists(Get get) {
+    if (!get.isCheckExistenceOnly()) {
+      get = ReflectionUtils.newInstance(get.getClass(), get);
+      get.setCheckExistenceOnly(true);
+    }
+    return get(get).thenApply(r -> r.getExists());
+  }
+
+  /**
+   * Extracts certain cells from a given row.
+   * @param get The object that specifies what data to fetch and from which row.
+   * @return The data coming from the specified row, if it exists. If the row specified doesn't
+   *         exist, the {@link Result} instance returned won't contain any
+   *         {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. The
+   *         return value will be wrapped by a {@link CompletableFuture}.
+   */
+  CompletableFuture<Result> get(Get get);
+
+  /**
+   * Puts some data to the table.
+   * @param put The data to put.
+   * @return A {@link CompletableFuture} that always returns null when complete normally.
+   */
+  CompletableFuture<Void> put(Put put);
+
+  /**
+   * Deletes the specified cells/row.
+   * @param delete The object that specifies what to delete.
+   * @return A {@link CompletableFuture} that always returns null when complete normally.
+   */
+  CompletableFuture<Void> delete(Delete delete);
+
+  /**
+   * Appends values to one or more columns within a single row.
+   * <p>
+   * This operation does not appear atomic to readers. Appends are done under a single row lock, so
+   * write operations to a row are synchronized, but readers do not take row locks so get and scan
+   * operations can see this operation partially completed.
+   * @param append object that specifies the columns and amounts to be used for the increment
+   *          operations
+   * @return values of columns after the append operation (maybe null). The return value will be
+   *         wrapped by a {@link CompletableFuture}.
+   */
+  CompletableFuture<Result> append(Append append);
+
+  /**
+   * Increments one or more columns within a single row.
+   * <p>
+   * This operation does not appear atomic to readers. Increments are done under a single row lock,
+   * so write operations to a row are synchronized, but readers do not take row locks so get and
+   * scan operations can see this operation partially completed.
+   * @param increment object that specifies the columns and amounts to be used for the increment
+   *          operations
+   * @return values of columns after the increment. The return value will be wrapped by a
+   *         {@link CompletableFuture}.
+   */
+  CompletableFuture<Result> increment(Increment increment);
+
+  /**
+   * See {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)}
+   * <p>
+   * The {@link Durability} is defaulted to {@link Durability#SYNC_WAL}.
+   * @param row The row that contains the cell to increment.
+   * @param family The column family of the cell to increment.
+   * @param qualifier The column qualifier of the cell to increment.
+   * @param amount The amount to increment the cell with (or decrement, if the amount is negative).
+   * @return The new value, post increment. The return value will be wrapped by a
+   *         {@link CompletableFuture}.
+   */
+  default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
+      long amount) {
+    return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
+  }
+
+  /**
+   * Atomically increments a column value. If the column value already exists and is not a
+   * big-endian long, this could throw an exception. If the column value does not yet exist it is
+   * initialized to <code>amount</code> and written to the specified column.
+   * <p>
+   * Setting durability to {@link Durability#SKIP_WAL} means that in a fail scenario you will lose
+   * any increments that have not been flushed.
+   * @param row The row that contains the cell to increment.
+   * @param family The column family of the cell to increment.
+   * @param qualifier The column qualifier of the cell to increment.
+   * @param amount The amount to increment the cell with (or decrement, if the amount is negative).
+   * @param durability The persistence guarantee for this increment.
+   * @return The new value, post increment. The return value will be wrapped by a
+   *         {@link CompletableFuture}.
+   */
+  default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
+      long amount, Durability durability) {
+    Preconditions.checkNotNull(row, "row is null");
+    Preconditions.checkNotNull(family, "family is null");
+    Preconditions.checkNotNull(qualifier, "qualifier is null");
+    return increment(
+      new Increment(row).addColumn(family, qualifier, amount).setDurability(durability))
+          .thenApply(r -> Bytes.toLong(r.getValue(family, qualifier)));
+  }
+
+  /**
+   * Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it
+   * adds the put. If the passed value is null, the check is for the lack of column (ie:
+   * non-existence)
+   * @param row to check
+   * @param family column family to check
+   * @param qualifier column qualifier to check
+   * @param value the expected value
+   * @param put data to put if check succeeds
+   * @return true if the new put was executed, false otherwise. The return value will be wrapped by
+   *         a {@link CompletableFuture}.
+   */
+  default CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier,
+      byte[] value, Put put) {
+    return checkAndPut(row, family, qualifier, CompareOp.EQUAL, value, put);
+  }
+
+  /**
+   * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
+   * adds the put. If the passed value is null, the check is for the lack of column (ie:
+   * non-existence)
+   * @param row to check
+   * @param family column family to check
+   * @param qualifier column qualifier to check
+   * @param compareOp comparison operator to use
+   * @param value the expected value
+   * @param put data to put if check succeeds
+   * @return true if the new put was executed, false otherwise. The return value will be wrapped by
+   *         a {@link CompletableFuture}.
+   */
+  CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier,
+      CompareOp compareOp, byte[] value, Put put);
+
+  /**
+   * Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it
+   * adds the delete. If the passed value is null, the check is for the lack of column (ie:
+   * non-existence)
+   * @param row to check
+   * @param family column family to check
+   * @param qualifier column qualifier to check
+   * @param value the expected value
+   * @param delete data to delete if check succeeds
+   * @return true if the new delete was executed, false otherwise. The return value will be wrapped
+   *         by a {@link CompletableFuture}.
+   */
+  default CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
+      byte[] value, Delete delete) {
+    return checkAndDelete(row, family, qualifier, CompareOp.EQUAL, value, delete);
+  }
+
+  /**
+   * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
+   * adds the delete. If the passed value is null, the check is for the lack of column (ie:
+   * non-existence)
+   * @param row to check
+   * @param family column family to check
+   * @param qualifier column qualifier to check
+   * @param compareOp comparison operator to use
+   * @param value the expected value
+   * @param delete data to delete if check succeeds
+   * @return true if the new delete was executed, false otherwise. The return value will be wrapped
+   *         by a {@link CompletableFuture}.
+   */
+  CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
+      CompareOp compareOp, byte[] value, Delete delete);
+
+  /**
+   * Performs multiple mutations atomically on a single row. Currently {@link Put} and
+   * {@link Delete} are supported.
+   * @param mutation object that specifies the set of mutations to perform atomically
+   * @return A {@link CompletableFuture} that always returns null when complete normally.
+   */
+  CompletableFuture<Void> mutateRow(RowMutations mutation);
+
+  /**
+   * Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it
+   * performs the row mutations. If the passed value is null, the check is for the lack of column
+   * (ie: non-existence)
+   * @param row to check
+   * @param family column family to check
+   * @param qualifier column qualifier to check
+   * @param value the expected value
+   * @param mutation mutations to perform if check succeeds
+   * @return true if the new put was executed, false otherwise. The return value will be wrapped by
+   *         a {@link CompletableFuture}.
+   */
+  default CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
+      byte[] value, RowMutations mutation) {
+    return checkAndMutate(row, family, qualifier, CompareOp.EQUAL, value, mutation);
+  }
+
+  /**
+   * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
+   * performs the row mutations. If the passed value is null, the check is for the lack of column
+   * (ie: non-existence)
+   * @param row to check
+   * @param family column family to check
+   * @param qualifier column qualifier to check
+   * @param compareOp the comparison operator
+   * @param value the expected value
+   * @param mutation mutations to perform if check succeeds
+   * @return true if the new put was executed, false otherwise. The return value will be wrapped by
+   *         a {@link CompletableFuture}.
+   */
+  CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
+      CompareOp compareOp, byte[] value, RowMutations mutation);
+
+  /**
+   * Just call {@link #smallScan(Scan, int)} with {@link Integer#MAX_VALUE}.
+   * @see #smallScan(Scan, int)
+   */
+  default CompletableFuture<List<Result>> smallScan(Scan scan) {
+    return smallScan(scan, Integer.MAX_VALUE);
+  }
+
+  /**
+   * Return all the results that match the given scan object. The number of the returned results
+   * will not be greater than {@code limit}.
+   * <p>
+   * Notice that the scan must be small, and should not use batch or allowPartialResults. The
+   * {@code caching} property of the scan object is also ignored as we will use {@code limit}
+   * instead.
+   * @param scan A configured {@link Scan} object.
+   * @param limit the limit of results count
+   * @return The results of this small scan operation. The return value will be wrapped by a
+   *         {@link CompletableFuture}.
+   */
+  CompletableFuture<List<Result>> smallScan(Scan scan, int limit);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/6ff19f94/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
index c5afceb..6e1233d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
@@ -17,392 +17,143 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
-
-import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
-import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
 
 /**
- * The implementation of AsyncTable.
+ * The implementation of AsyncTable. Based on {@link RawAsyncTable}.
  */
 @InterfaceAudience.Private
 class AsyncTableImpl implements AsyncTable {
 
-  private static final Log LOG = LogFactory.getLog(AsyncTableImpl.class);
-
-  private final AsyncConnectionImpl conn;
-
-  private final TableName tableName;
-
-  private final int defaultScannerCaching;
-
-  private final long defaultScannerMaxResultSize;
+  private final RawAsyncTable rawTable;
 
-  private long readRpcTimeoutNs;
+  private final ExecutorService pool;
 
-  private long writeRpcTimeoutNs;
-
-  private long operationTimeoutNs;
-
-  private long scanTimeoutNs;
-
-  public AsyncTableImpl(AsyncConnectionImpl conn, TableName tableName) {
-    this.conn = conn;
-    this.tableName = tableName;
-    this.readRpcTimeoutNs = conn.connConf.getReadRpcTimeoutNs();
-    this.writeRpcTimeoutNs = conn.connConf.getWriteRpcTimeoutNs();
-    this.operationTimeoutNs = tableName.isSystemTable() ? conn.connConf.getMetaOperationTimeoutNs()
-        : conn.connConf.getOperationTimeoutNs();
-    this.defaultScannerCaching = conn.connConf.getScannerCaching();
-    this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
-    this.scanTimeoutNs = conn.connConf.getScanTimeoutNs();
+  public AsyncTableImpl(AsyncConnectionImpl conn, TableName tableName, ExecutorService pool) {
+    this.rawTable = conn.getRawTable(tableName);
+    this.pool = pool;
   }
 
   @Override
   public TableName getName() {
-    return tableName;
+    return rawTable.getName();
   }
 
   @Override
   public Configuration getConfiguration() {
-    return conn.getConfiguration();
-  }
-
-  @FunctionalInterface
-  private interface Converter<D, I, S> {
-    D convert(I info, S src) throws IOException;
+    return rawTable.getConfiguration();
   }
 
-  @FunctionalInterface
-  private interface RpcCall<RESP, REQ> {
-    void call(ClientService.Interface stub, HBaseRpcController controller, REQ req,
-        RpcCallback<RESP> done);
+  @Override
+  public void setReadRpcTimeout(long timeout, TimeUnit unit) {
+    rawTable.setReadRpcTimeout(timeout, unit);
   }
 
-  private static <REQ, PREQ, PRESP, RESP> CompletableFuture<RESP> call(
-      HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req,
-      Converter<PREQ, byte[], REQ> reqConvert, RpcCall<PRESP, PREQ> rpcCall,
-      Converter<RESP, HBaseRpcController, PRESP> respConverter) {
-    CompletableFuture<RESP> future = new CompletableFuture<>();
-    try {
-      rpcCall.call(stub, controller, reqConvert.convert(loc.getRegionInfo().getRegionName(), req),
-        new RpcCallback<PRESP>() {
-
-          @Override
-          public void run(PRESP resp) {
-            if (controller.failed()) {
-              future.completeExceptionally(controller.getFailed());
-            } else {
-              try {
-                future.complete(respConverter.convert(controller, resp));
-              } catch (IOException e) {
-                future.completeExceptionally(e);
-              }
-            }
-          }
-        });
-    } catch (IOException e) {
-      future.completeExceptionally(e);
-    }
-    return future;
+  @Override
+  public long getReadRpcTimeout(TimeUnit unit) {
+    return rawTable.getReadRpcTimeout(unit);
   }
 
-  private static <REQ, RESP> CompletableFuture<RESP> mutate(HBaseRpcController controller,
-      HRegionLocation loc, ClientService.Interface stub, REQ req,
-      Converter<MutateRequest, byte[], REQ> reqConvert,
-      Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
-    return call(controller, loc, stub, req, reqConvert, (s, c, r, done) -> s.mutate(c, r, done),
-      respConverter);
+  @Override
+  public void setWriteRpcTimeout(long timeout, TimeUnit unit) {
+    rawTable.setWriteRpcTimeout(timeout, unit);
   }
 
-  private static <REQ> CompletableFuture<Void> voidMutate(HBaseRpcController controller,
-      HRegionLocation loc, ClientService.Interface stub, REQ req,
-      Converter<MutateRequest, byte[], REQ> reqConvert) {
-    return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> {
-      return null;
-    });
+  @Override
+  public long getWriteRpcTimeout(TimeUnit unit) {
+    return rawTable.getWriteRpcTimeout(unit);
   }
 
-  private static Result toResult(HBaseRpcController controller, MutateResponse resp)
-      throws IOException {
-    if (!resp.hasResult()) {
-      return null;
-    }
-    return ProtobufUtil.toResult(resp.getResult(), controller.cellScanner());
+  @Override
+  public void setOperationTimeout(long timeout, TimeUnit unit) {
+    rawTable.setOperationTimeout(timeout, unit);
   }
 
-  @FunctionalInterface
-  private interface NoncedConverter<D, I, S> {
-    D convert(I info, S src, long nonceGroup, long nonce) throws IOException;
+  @Override
+  public long getOperationTimeout(TimeUnit unit) {
+    return rawTable.getOperationTimeout(unit);
   }
 
-  private <REQ, RESP> CompletableFuture<RESP> noncedMutate(HBaseRpcController controller,
-      HRegionLocation loc, ClientService.Interface stub, REQ req,
-      NoncedConverter<MutateRequest, byte[], REQ> reqConvert,
-      Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
-    long nonceGroup = conn.getNonceGenerator().getNonceGroup();
-    long nonce = conn.getNonceGenerator().newNonce();
-    return mutate(controller, loc, stub, req,
-      (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter);
+  @Override
+  public void setScanTimeout(long timeout, TimeUnit unit) {
+    rawTable.setScanTimeout(timeout, unit);
   }
 
-  private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, long rpcTimeoutNs) {
-    return conn.callerFactory.<T> single().table(tableName).row(row)
-        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
-        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS);
+  @Override
+  public long getScanTimeout(TimeUnit unit) {
+    return rawTable.getScanTimeout(unit);
   }
 
-  private <T> SingleRequestCallerBuilder<T> newCaller(Row row, long rpcTimeoutNs) {
-    return newCaller(row.getRow(), rpcTimeoutNs);
+  private <T> CompletableFuture<T> wrap(CompletableFuture<T> future) {
+    CompletableFuture<T> asyncFuture = new CompletableFuture<>();
+    future.whenCompleteAsync((r, e) -> {
+      if (e != null) {
+        asyncFuture.completeExceptionally(e);
+      } else {
+        asyncFuture.complete(r);
+      }
+    }, pool);
+    return asyncFuture;
   }
 
   @Override
   public CompletableFuture<Result> get(Get get) {
-    return this.<Result> newCaller(get, readRpcTimeoutNs)
-        .action((controller, loc, stub) -> AsyncTableImpl
-            .<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get,
-              RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done),
-              (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner())))
-        .call();
+    return wrap(rawTable.get(get));
   }
 
   @Override
   public CompletableFuture<Void> put(Put put) {
-    return this
-        .<Void> newCaller(put, writeRpcTimeoutNs).action((controller, loc, stub) -> AsyncTableImpl
-            .<Put> voidMutate(controller, loc, stub, put, RequestConverter::buildMutateRequest))
-        .call();
+    return wrap(rawTable.put(put));
   }
 
   @Override
   public CompletableFuture<Void> delete(Delete delete) {
-    return this.<Void> newCaller(delete, writeRpcTimeoutNs)
-        .action((controller, loc, stub) -> AsyncTableImpl.<Delete> voidMutate(controller, loc, stub,
-          delete, RequestConverter::buildMutateRequest))
-        .call();
+    return wrap(rawTable.delete(delete));
   }
 
   @Override
   public CompletableFuture<Result> append(Append append) {
-    checkHasFamilies(append);
-    return this.<Result> newCaller(append, writeRpcTimeoutNs)
-        .action((controller, loc, stub) -> this.<Append, Result> noncedMutate(controller, loc, stub,
-          append, RequestConverter::buildMutateRequest, AsyncTableImpl::toResult))
-        .call();
+    return wrap(rawTable.append(append));
   }
 
   @Override
   public CompletableFuture<Result> increment(Increment increment) {
-    checkHasFamilies(increment);
-    return this.<Result> newCaller(increment, writeRpcTimeoutNs)
-        .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(controller, loc,
-          stub, increment, RequestConverter::buildMutateRequest, AsyncTableImpl::toResult))
-        .call();
+    return wrap(rawTable.increment(increment));
   }
 
   @Override
   public CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier,
       CompareOp compareOp, byte[] value, Put put) {
-    return this.<Boolean> newCaller(row, writeRpcTimeoutNs)
-        .action((controller, loc, stub) -> AsyncTableImpl.<Put, Boolean> mutate(controller, loc,
-          stub, put,
-          (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
-            new BinaryComparator(value), CompareType.valueOf(compareOp.name()), p),
-          (c, r) -> r.getProcessed()))
-        .call();
+    return wrap(rawTable.checkAndPut(row, family, qualifier, compareOp, value, put));
   }
 
   @Override
   public CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
       CompareOp compareOp, byte[] value, Delete delete) {
-    return this.<Boolean> newCaller(row, writeRpcTimeoutNs)
-        .action((controller, loc, stub) -> AsyncTableImpl.<Delete, Boolean> mutate(controller, loc,
-          stub, delete,
-          (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
-            new BinaryComparator(value), CompareType.valueOf(compareOp.name()), d),
-          (c, r) -> r.getProcessed()))
-        .call();
-  }
-
-  // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
-  // so here I write a new method as I do not want to change the abstraction of call method.
-  private static <RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller,
-      HRegionLocation loc, ClientService.Interface stub, RowMutations mutation,
-      Converter<MultiRequest, byte[], RowMutations> reqConvert,
-      Function<Result, RESP> respConverter) {
-    CompletableFuture<RESP> future = new CompletableFuture<>();
-    try {
-      byte[] regionName = loc.getRegionInfo().getRegionName();
-      MultiRequest req = reqConvert.convert(regionName, mutation);
-      stub.multi(controller, req, new RpcCallback<MultiResponse>() {
-
-        @Override
-        public void run(MultiResponse resp) {
-          if (controller.failed()) {
-            future.completeExceptionally(controller.getFailed());
-          } else {
-            try {
-              org.apache.hadoop.hbase.client.MultiResponse multiResp =
-                  ResponseConverter.getResults(req, resp, controller.cellScanner());
-              Throwable ex = multiResp.getException(regionName);
-              if (ex != null) {
-                future
-                    .completeExceptionally(ex instanceof IOException ? ex
-                        : new IOException(
-                            "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()),
-                            ex));
-              } else {
-                future.complete(respConverter
-                    .apply((Result) multiResp.getResults().get(regionName).result.get(0)));
-              }
-            } catch (IOException e) {
-              future.completeExceptionally(e);
-            }
-          }
-        }
-      });
-    } catch (IOException e) {
-      future.completeExceptionally(e);
-    }
-    return future;
+    return wrap(rawTable.checkAndDelete(row, family, qualifier, compareOp, value, delete));
   }
 
   @Override
   public CompletableFuture<Void> mutateRow(RowMutations mutation) {
-    return this.<Void> newCaller(mutation, writeRpcTimeoutNs).action((controller, loc,
-        stub) -> AsyncTableImpl.<Void> mutateRow(controller, loc, stub, mutation, (rn, rm) -> {
-          RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm);
-          regionMutationBuilder.setAtomic(true);
-          return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
-        }, (resp) -> {
-          return null;
-        })).call();
+    return wrap(rawTable.mutateRow(mutation));
   }
 
   @Override
   public CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
       CompareOp compareOp, byte[] value, RowMutations mutation) {
-    return this.<Boolean> newCaller(mutation, writeRpcTimeoutNs)
-        .action((controller, loc, stub) -> AsyncTableImpl.<Boolean> mutateRow(controller, loc, stub,
-          mutation,
-          (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
-            new BinaryComparator(value), CompareType.valueOf(compareOp.name()), rm),
-          (resp) -> resp.getExists()))
-        .call();
-  }
-
-  private <T> CompletableFuture<T> failedFuture(Throwable error) {
-    CompletableFuture<T> future = new CompletableFuture<>();
-    future.completeExceptionally(error);
-    return future;
-  }
-
-  private Scan setDefaultScanConfig(Scan scan) {
-    // always create a new scan object as we may reset the start row later.
-    Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan);
-    if (newScan.getCaching() <= 0) {
-      newScan.setCaching(defaultScannerCaching);
-    }
-    if (newScan.getMaxResultSize() <= 0) {
-      newScan.setMaxResultSize(defaultScannerMaxResultSize);
-    }
-    return newScan;
+    return wrap(rawTable.checkAndMutate(row, family, qualifier, compareOp, value, mutation));
   }
 
   @Override
   public CompletableFuture<List<Result>> smallScan(Scan scan, int limit) {
-    if (!scan.isSmall()) {
-      return failedFuture(new IllegalArgumentException("Only small scan is allowed"));
-    }
-    if (scan.getBatch() > 0 || scan.getAllowPartialResults()) {
-      return failedFuture(
-        new IllegalArgumentException("Batch and allowPartial is not allowed for small scan"));
-    }
-    return conn.callerFactory.smallScan().table(tableName).setScan(setDefaultScanConfig(scan))
-        .limit(limit).scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS)
-        .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call();
-  }
-
-  public void scan(Scan scan, ScanResultConsumer consumer) {
-    if (scan.isSmall()) {
-      if (scan.getBatch() > 0 || scan.getAllowPartialResults()) {
-        consumer.onError(
-          new IllegalArgumentException("Batch and allowPartial is not allowed for small scan"));
-      } else {
-        LOG.warn("This is small scan " + scan + ", consider using smallScan directly?");
-      }
-    }
-    scan = setDefaultScanConfig(scan);
-    new AsyncClientScanner(scan, consumer, tableName, conn, scanTimeoutNs, readRpcTimeoutNs)
-        .start();
-  }
-
-  @Override
-  public void setReadRpcTimeout(long timeout, TimeUnit unit) {
-    this.readRpcTimeoutNs = unit.toNanos(timeout);
-  }
-
-  @Override
-  public long getReadRpcTimeout(TimeUnit unit) {
-    return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS);
-  }
-
-  @Override
-  public void setWriteRpcTimeout(long timeout, TimeUnit unit) {
-    this.writeRpcTimeoutNs = unit.toNanos(timeout);
-  }
-
-  @Override
-  public long getWriteRpcTimeout(TimeUnit unit) {
-    return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS);
-  }
-
-  @Override
-  public void setOperationTimeout(long timeout, TimeUnit unit) {
-    this.operationTimeoutNs = unit.toNanos(timeout);
-  }
-
-  @Override
-  public long getOperationTimeout(TimeUnit unit) {
-    return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS);
-  }
-
-  @Override
-  public void setScanTimeout(long timeout, TimeUnit unit) {
-    this.scanTimeoutNs = unit.toNanos(timeout);
-  }
-
-  @Override
-  public long getScanTimeout(TimeUnit unit) {
-    return TimeUnit.NANOSECONDS.convert(scanTimeoutNs, unit);
+    return wrap(rawTable.smallScan(scan, limit));
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/6ff19f94/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
index f0903db..ec33dd2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
@@ -17,11 +17,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.util.Threads;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
 
 import java.io.IOException;
 import java.util.Queue;
@@ -31,6 +27,12 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.util.Threads;
+
 /**
  * ClientAsyncPrefetchScanner implements async scanner behaviour.
  * Specifically, the cache used by this scanner is a concurrent queue which allows both

http://git-wip-us.apache.org/repos/asf/hbase/blob/6ff19f94/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index b7bdb83..20ed183 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore;
 
@@ -36,7 +37,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
@@ -554,15 +554,6 @@ public abstract class ClientScanner extends AbstractClientScanner {
     return remainingResultSize > 0 && remainingRows > 0 && !regionHasMoreResults;
   }
 
-  protected long calcEstimatedSize(Result rs) {
-    long estimatedHeapSizeOfResult = 0;
-    // We don't make Iterator here
-    for (Cell cell : rs.rawCells()) {
-      estimatedHeapSizeOfResult += CellUtil.estimatedHeapSizeOf(cell);
-    }
-    return estimatedHeapSizeOfResult;
-  }
-
   protected void addEstimatedSize(long estimatedHeapSizeOfResult) {
     return;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6ff19f94/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index d464c3b..9df9fbb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -35,6 +35,9 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
@@ -290,4 +293,35 @@ public final class ConnectionUtils {
     }
     return t;
   }
+
+  static long calcEstimatedSize(Result rs) {
+    long estimatedHeapSizeOfResult = 0;
+    // We don't make Iterator here
+    for (Cell cell : rs.rawCells()) {
+      estimatedHeapSizeOfResult += CellUtil.estimatedHeapSizeOf(cell);
+    }
+    return estimatedHeapSizeOfResult;
+  }
+
+  static Result filterCells(Result result, Cell keepCellsAfter) {
+    // not the same row
+    if (!CellUtil.matchingRow(keepCellsAfter, result.getRow(), 0, result.getRow().length)) {
+      return result;
+    }
+    Cell[] rawCells = result.rawCells();
+    int index = Arrays.binarySearch(rawCells, keepCellsAfter, CellComparator::compareWithoutRow);
+    if (index < 0) {
+      index = -index - 1;
+    } else {
+      index++;
+    }
+    if (index == 0) {
+      return result;
+    }
+    if (index == rawCells.length) {
+      return null;
+    }
+    return Result.create(Arrays.copyOfRange(rawCells, index, rawCells.length), null,
+      result.isStale(), true);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6ff19f94/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
new file mode 100644
index 0000000..14184b0
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * A low level asynchronous table.
+ * <p>
+ * The returned {@code CompletableFuture} will be finished directly in the rpc framework's callback
+ * thread, so typically you should not do any time consuming work inside these methods, otherwise
+ * you will be likely to block at least one connection to RS(even more if the rpc framework uses
+ * NIO).
+ * <p>
+ * So, only experts that want to build high performance service should use this interface directly,
+ * especially for the {@link #scan(Scan, ScanResultConsumer)} below.
+ * <p>
+ * TODO: For now the only difference between this interface and {@link AsyncTable} is the scan
+ * method. The {@link ScanResultConsumer} exposes the implementation details of a scan(heartbeat) so
+ * it is not suitable for a normal user. If it is still the only difference after we implement most
+ * features of AsyncTable, we can think about merge these two interfaces.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface RawAsyncTable extends AsyncTableBase {
+
+  /**
+   * The basic scan API uses the observer pattern. All results that match the given scan object will
+   * be passed to the given {@code consumer} by calling {@link ScanResultConsumer#onNext(Result[])}.
+   * {@link ScanResultConsumer#onComplete()} means the scan is finished, and
+   * {@link ScanResultConsumer#onError(Throwable)} means we hit an unrecoverable error and the scan
+   * is terminated. {@link ScanResultConsumer#onHeartbeat()} means the RS is still working but we
+   * can not get a valid result to call {@link ScanResultConsumer#onNext(Result[])}. This is usually
+   * because the matched results are too sparse, for example, a filter which almost filters out
+   * everything is specified.
+   * <p>
+   * Notice that, the methods of the given {@code consumer} will be called directly in the rpc
+   * framework's callback thread, so typically you should not do any time consuming work inside
+   * these methods, otherwise you will be likely to block at least one connection to RS(even more if
+   * the rpc framework uses NIO).
+   * @param scan A configured {@link Scan} object.
+   * @param consumer the consumer used to receive results.
+   */
+  void scan(Scan scan, ScanResultConsumer consumer);
+}