You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/11/11 13:01:31 UTC

[2/2] hbase git commit: HBASE-16838 Implement basic scan

HBASE-16838 Implement basic scan


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8a6d6aa2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8a6d6aa2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8a6d6aa2

Branch: refs/heads/master
Commit: 8a6d6aa23944bbfd5047cf0c09fb4d8045735dab
Parents: fa838b0
Author: zhangduo <zh...@apache.org>
Authored: Fri Nov 11 16:47:42 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Fri Nov 11 21:01:16 2016 +0800

----------------------------------------------------------------------
 .../client/AllowPartialScanResultCache.java     |  98 ++++++
 .../hadoop/hbase/client/AsyncClientScanner.java | 151 ++++++++
 .../client/AsyncConnectionConfiguration.java    |  12 +-
 .../hbase/client/AsyncConnectionImpl.java       |   2 +-
 .../hbase/client/AsyncRegistryFactory.java      |  43 +++
 .../client/AsyncRpcRetryingCallerFactory.java   |  86 +++++
 .../AsyncScanSingleRegionRpcRetryingCaller.java | 351 +++++++++++++++++++
 .../AsyncSingleRequestRpcRetryingCaller.java    |  39 +--
 .../apache/hadoop/hbase/client/AsyncTable.java  |  37 ++
 .../hadoop/hbase/client/AsyncTableImpl.java     |  28 ++
 .../hadoop/hbase/client/ClientScanner.java      |   1 +
 .../hbase/client/ClusterRegistryFactory.java    |  43 ---
 .../hbase/client/CompleteScanResultCache.java   |  97 +++++
 .../hadoop/hbase/client/ConnectionUtils.java    |  26 ++
 .../hadoop/hbase/client/ScanResultCache.java    |  53 +++
 .../hadoop/hbase/client/ScanResultConsumer.java |  63 ++++
 .../client/AbstractTestAsyncTableScan.java      | 155 ++++++++
 .../client/TestAllowPartialScanResultCache.java |  92 +++++
 .../hadoop/hbase/client/TestAsyncTableScan.java | 147 ++++++++
 .../hbase/client/TestAsyncTableSmallScan.java   | 164 +--------
 .../TestCompleteResultScanResultCache.java      | 159 +++++++++
 21 files changed, 1614 insertions(+), 233 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/8a6d6aa2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
new file mode 100644
index 0000000..bc6e44e
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A ScanResultCache that may return partial result.
+ * <p>
+ * As we can only scan from the starting of a row when error, so here we also implement the logic
+ * that skips the cells that have already been returned.
+ */
+@InterfaceAudience.Private
+class AllowPartialScanResultCache implements ScanResultCache {
+
+  // used to filter out the cells that already returned to user as we always start from the
+  // beginning of a row when retry.
+  private Cell lastCell;
+
+  private Result filterCells(Result result) {
+    if (lastCell == null) {
+      return result;
+    }
+
+    // not the same row
+    if (!CellUtil.matchingRow(lastCell, result.getRow(), 0, result.getRow().length)) {
+      return result;
+    }
+    Cell[] rawCells = result.rawCells();
+    int index = Arrays.binarySearch(rawCells, lastCell, CellComparator::compareWithoutRow);
+    if (index < 0) {
+      index = -index - 1;
+    } else {
+      index++;
+    }
+    if (index == 0) {
+      return result;
+    }
+    if (index == rawCells.length) {
+      return null;
+    }
+    return Result.create(Arrays.copyOfRange(rawCells, index, rawCells.length), null,
+      result.isStale(), true);
+  }
+
+  private void updateLastCell(Result result) {
+    lastCell = result.isPartial() ? result.rawCells()[result.rawCells().length - 1] : null;
+  }
+
+  @Override
+  public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException {
+    if (results.length == 0) {
+      return EMPTY_RESULT_ARRAY;
+    }
+    Result first = filterCells(results[0]);
+    if (results.length == 1) {
+      if (first == null) {
+        // do not update last cell if we filter out all cells
+        return EMPTY_RESULT_ARRAY;
+      }
+      updateLastCell(results[0]);
+      results[0] = first;
+      return results;
+    }
+    updateLastCell(results[results.length - 1]);
+    if (first == null) {
+      return Arrays.copyOfRange(results, 1, results.length);
+    }
+    results[0] = first;
+    return results;
+  }
+
+  @Override
+  public void clear() {
+    // we do not cache anything
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a6d6aa2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
new file mode 100644
index 0000000..504a44a
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
+import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
+
+/**
+ * The asynchronous client scanner implementation.
+ * <p>
+ * Here we will call OpenScanner first and use the returned scannerId to create a
+ * {@link AsyncScanSingleRegionRpcRetryingCaller} to do the real scan operation. The return value of
+ * {@link AsyncScanSingleRegionRpcRetryingCaller} will tell us whether open a new scanner or finish
+ * scan.
+ */
+@InterfaceAudience.Private
+class AsyncClientScanner {
+
+  // We will use this scan object during the whole scan operation. The
+  // AsyncScanSingleRegionRpcRetryingCaller will modify this scan object directly.
+  private final Scan scan;
+
+  private final ScanResultConsumer consumer;
+
+  private final TableName tableName;
+
+  private final AsyncConnectionImpl conn;
+
+  private final long scanTimeoutNs;
+
+  private final long rpcTimeoutNs;
+
+  private final ScanResultCache resultCache;
+
+  public AsyncClientScanner(Scan scan, ScanResultConsumer consumer, TableName tableName,
+      AsyncConnectionImpl conn, long scanTimeoutNs, long rpcTimeoutNs) {
+    if (scan.getStartRow() == null) {
+      scan.setStartRow(EMPTY_START_ROW);
+    }
+    if (scan.getStopRow() == null) {
+      scan.setStopRow(EMPTY_END_ROW);
+    }
+    this.scan = scan;
+    this.consumer = consumer;
+    this.tableName = tableName;
+    this.conn = conn;
+    this.scanTimeoutNs = scanTimeoutNs;
+    this.rpcTimeoutNs = rpcTimeoutNs;
+    this.resultCache = scan.getAllowPartialResults() || scan.getBatch() > 0
+        ? new AllowPartialScanResultCache() : new CompleteScanResultCache();
+  }
+
+  private static final class OpenScannerResponse {
+
+    public final HRegionLocation loc;
+
+    public final ClientService.Interface stub;
+
+    public final long scannerId;
+
+    public OpenScannerResponse(HRegionLocation loc, Interface stub, long scannerId) {
+      this.loc = loc;
+      this.stub = stub;
+      this.scannerId = scannerId;
+    }
+  }
+
+  private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcController controller,
+      HRegionLocation loc, ClientService.Interface stub) {
+    CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>();
+    try {
+      ScanRequest request =
+          RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(), scan, 0, false);
+      stub.scan(controller, request, resp -> {
+        if (controller.failed()) {
+          future.completeExceptionally(controller.getFailed());
+          return;
+        }
+        future.complete(new OpenScannerResponse(loc, stub, resp.getScannerId()));
+      });
+    } catch (IOException e) {
+      future.completeExceptionally(e);
+    }
+    return future;
+  }
+
+  private void startScan(OpenScannerResponse resp) {
+    conn.callerFactory.scanSingleRegion().id(resp.scannerId).location(resp.loc).stub(resp.stub)
+        .setScan(scan).consumer(consumer).resultCache(resultCache)
+        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+        .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).start()
+        .whenComplete((locateToPreviousRegion, error) -> {
+          if (error != null) {
+            consumer.onError(error);
+            return;
+          }
+          if (locateToPreviousRegion == null) {
+            consumer.onComplete();
+          } else {
+            openScanner(locateToPreviousRegion.booleanValue());
+          }
+        });
+  }
+
+  private void openScanner(boolean locateToPreviousRegion) {
+    conn.callerFactory.<OpenScannerResponse> single().table(tableName).row(scan.getStartRow())
+        .locateToPreviousRegion(locateToPreviousRegion)
+        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+        .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).action(this::callOpenScanner).call()
+        .whenComplete((resp, error) -> {
+          if (error != null) {
+            consumer.onError(error);
+            return;
+          }
+          startScan(resp);
+        });
+  }
+
+  public void start() {
+    openScanner(scan.isReversed() && isEmptyStartRow(scan.getStartRow()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a6d6aa2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
index aaac845..6279d46 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
@@ -52,6 +52,8 @@ class AsyncConnectionConfiguration {
 
   private final long metaOperationTimeoutNs;
 
+  // timeout for a whole operation such as get, put or delete. Notice that scan will not be effected
+  // by this value, see scanTimeoutNs.
   private final long operationTimeoutNs;
 
   // timeout for each read rpc request
@@ -67,6 +69,10 @@ class AsyncConnectionConfiguration {
   /** How many retries are allowed before we start to log */
   private final int startLogErrorsCnt;
 
+  // As now we have heartbeat support for scan, ideally a scan will never timeout unless the RS is
+  // crash. The RS will always return something before the rpc timeout or scan timeout to tell the
+  // client that it is still alive. The scan timeout is used as operation timeout for every
+  // operations in a scan, such as openScanner or next.
   private final long scanTimeoutNs;
 
   private final int scannerCaching;
@@ -125,15 +131,15 @@ class AsyncConnectionConfiguration {
     return startLogErrorsCnt;
   }
 
-  public long getScanTimeoutNs() {
+  long getScanTimeoutNs() {
     return scanTimeoutNs;
   }
 
-  public int getScannerCaching() {
+  int getScannerCaching() {
     return scannerCaching;
   }
 
-  public long getScannerMaxResultSize() {
+  long getScannerMaxResultSize() {
     return scannerMaxResultSize;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a6d6aa2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 6cad6a2..70e024e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -92,7 +92,7 @@ class AsyncConnectionImpl implements AsyncConnection {
     this.user = user;
     this.connConf = new AsyncConnectionConfiguration(conf);
     this.locator = new AsyncRegionLocator(this);
-    this.registry = ClusterRegistryFactory.getRegistry(conf);
+    this.registry = AsyncRegistryFactory.getRegistry(conf);
     this.clusterId = Optional.ofNullable(registry.getClusterId()).orElseGet(() -> {
       if (LOG.isDebugEnabled()) {
         LOG.debug("cluster id came back null, using default " + CLUSTER_ID_DEFAULT);

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a6d6aa2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistryFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistryFactory.java
new file mode 100644
index 0000000..2fc3322
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistryFactory.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+
+/**
+ * Get instance of configured Registry.
+ */
+@InterfaceAudience.Private
+final class AsyncRegistryFactory {
+
+  static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl";
+
+  private AsyncRegistryFactory() {
+  }
+
+  /**
+   * @return The cluster registry implementation to use.
+   */
+  static AsyncRegistry getRegistry(Configuration conf) {
+    Class<? extends AsyncRegistry> clazz =
+        conf.getClass(REGISTRY_IMPL_CONF_KEY, ZKAsyncRegistry.class, AsyncRegistry.class);
+    return ReflectionUtils.newInstance(clazz, conf);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a6d6aa2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index 0d23c39..d0bbcdb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -26,8 +26,10 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 
 /**
  * Factory to create an AsyncRpcRetryCaller.
@@ -171,4 +173,88 @@ class AsyncRpcRetryingCallerFactory {
   public SmallScanCallerBuilder smallScan() {
     return new SmallScanCallerBuilder();
   }
+
+  public class ScanSingleRegionCallerBuilder {
+
+    private long scannerId = -1L;
+
+    private Scan scan;
+
+    private ScanResultCache resultCache;
+
+    private ScanResultConsumer consumer;
+
+    private ClientService.Interface stub;
+
+    private HRegionLocation loc;
+
+    private long scanTimeoutNs;
+
+    private long rpcTimeoutNs;
+
+    public ScanSingleRegionCallerBuilder id(long scannerId) {
+      this.scannerId = scannerId;
+      return this;
+    }
+
+    public ScanSingleRegionCallerBuilder setScan(Scan scan) {
+      this.scan = scan;
+      return this;
+    }
+
+    public ScanSingleRegionCallerBuilder resultCache(ScanResultCache resultCache) {
+      this.resultCache = resultCache;
+      return this;
+    }
+
+    public ScanSingleRegionCallerBuilder consumer(ScanResultConsumer consumer) {
+      this.consumer = consumer;
+      return this;
+    }
+
+    public ScanSingleRegionCallerBuilder stub(ClientService.Interface stub) {
+      this.stub = stub;
+      return this;
+    }
+
+    public ScanSingleRegionCallerBuilder location(HRegionLocation loc) {
+      this.loc = loc;
+      return this;
+    }
+
+    public ScanSingleRegionCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) {
+      this.scanTimeoutNs = unit.toNanos(scanTimeout);
+      return this;
+    }
+
+    public ScanSingleRegionCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) {
+      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
+      return this;
+    }
+
+    public AsyncScanSingleRegionRpcRetryingCaller build() {
+      checkArgument(scannerId >= 0, "invalid scannerId %d", scannerId);
+      return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn,
+          checkNotNull(scan, "scan is null"), scannerId,
+          checkNotNull(resultCache, "resultCache is null"),
+          checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"),
+          checkNotNull(loc, "location is null"), conn.connConf.getPauseNs(),
+          conn.connConf.getMaxRetries(), scanTimeoutNs, rpcTimeoutNs,
+          conn.connConf.getStartLogErrorsCnt());
+    }
+
+    /**
+     * Short cut for {@code build().start()}.
+     */
+    public CompletableFuture<Boolean> start() {
+      return build().start();
+    }
+  }
+
+  /**
+   * Create retry caller for scanning a region.
+   */
+  public ScanSingleRegionCallerBuilder scanSingleRegion() {
+    return new ScanSingleRegionCallerBuilder();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a6d6aa2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
new file mode 100644
index 0000000..0efac7f
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
+
+import io.netty.util.HashedWheelTimer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
+import org.apache.hadoop.hbase.exceptions.ScannerResetException;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * Retry caller for scanning a region.
+ * <p>
+ * We will modify the {@link Scan} object passed in directly. The upper layer should store the
+ * reference of this object and use it to open new single region scanners.
+ */
+@InterfaceAudience.Private
+class AsyncScanSingleRegionRpcRetryingCaller {
+
+  private static final Log LOG = LogFactory.getLog(AsyncScanSingleRegionRpcRetryingCaller.class);
+
+  private final HashedWheelTimer retryTimer;
+
+  private final Scan scan;
+
+  private final long scannerId;
+
+  private final ScanResultCache resultCache;
+
+  private final ScanResultConsumer consumer;
+
+  private final ClientService.Interface stub;
+
+  private final HRegionLocation loc;
+
+  private final long pauseNs;
+
+  private final int maxAttempts;
+
+  private final long scanTimeoutNs;
+
+  private final long rpcTimeoutNs;
+
+  private final int startLogErrorsCnt;
+
+  private final Supplier<byte[]> createNextStartRowWhenError;
+
+  private final Runnable completeWhenNoMoreResultsInRegion;
+
+  private final CompletableFuture<Boolean> future;
+
+  private final HBaseRpcController controller;
+
+  private byte[] nextStartRowWhenError;
+
+  private boolean includeNextStartRowWhenError;
+
+  private long nextCallStartNs;
+
+  private int tries = 1;
+
+  private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions;
+
+  private long nextCallSeq = -1L;
+
+  public AsyncScanSingleRegionRpcRetryingCaller(HashedWheelTimer retryTimer,
+      AsyncConnectionImpl conn, Scan scan, long scannerId, ScanResultCache resultCache,
+      ScanResultConsumer consumer, Interface stub, HRegionLocation loc, long pauseNs,
+      int maxRetries, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
+    this.retryTimer = retryTimer;
+    this.scan = scan;
+    this.scannerId = scannerId;
+    this.resultCache = resultCache;
+    this.consumer = consumer;
+    this.stub = stub;
+    this.loc = loc;
+    this.pauseNs = pauseNs;
+    this.maxAttempts = retries2Attempts(maxRetries);
+    this.scanTimeoutNs = scanTimeoutNs;
+    this.rpcTimeoutNs = rpcTimeoutNs;
+    this.startLogErrorsCnt = startLogErrorsCnt;
+    if (scan.isReversed()) {
+      createNextStartRowWhenError = this::createReversedNextStartRowWhenError;
+      completeWhenNoMoreResultsInRegion = this::completeReversedWhenNoMoreResultsInRegion;
+    } else {
+      createNextStartRowWhenError = this::createNextStartRowWhenError;
+      completeWhenNoMoreResultsInRegion = this::completeWhenNoMoreResultsInRegion;
+    }
+    this.future = new CompletableFuture<>();
+    this.controller = conn.rpcControllerFactory.newController();
+    this.exceptions = new ArrayList<>();
+  }
+
+  private long elapsedMs() {
+    return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs);
+  }
+
+  private void closeScanner() {
+    resetController(controller, rpcTimeoutNs);
+    ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false);
+    stub.scan(controller, req, resp -> {
+      if (controller.failed()) {
+        LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId
+            + " for " + loc.getRegionInfo().getEncodedName() + " of "
+            + loc.getRegionInfo().getTable() + " failed, ignore, probably already closed",
+          controller.getFailed());
+      }
+    });
+  }
+
+  private void completeExceptionally(boolean closeScanner) {
+    resultCache.clear();
+    if (closeScanner) {
+      closeScanner();
+    }
+    future.completeExceptionally(new RetriesExhaustedException(tries, exceptions));
+  }
+
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NONNULL_PARAM_VIOLATION",
+      justification = "https://github.com/findbugsproject/findbugs/issues/79")
+  private void completeNoMoreResults() {
+    future.complete(null);
+  }
+
+  private void completeWithNextStartRow(byte[] nextStartRow) {
+    scan.setStartRow(nextStartRow);
+    future.complete(scan.isReversed());
+  }
+
+  private byte[] createNextStartRowWhenError() {
+    return createClosestRowAfter(nextStartRowWhenError);
+  }
+
+  private byte[] createReversedNextStartRowWhenError() {
+    return createClosestRowBefore(nextStartRowWhenError);
+  }
+
+  private void completeWhenError(boolean closeScanner) {
+    resultCache.clear();
+    if (closeScanner) {
+      closeScanner();
+    }
+    if (nextStartRowWhenError != null) {
+      scan.setStartRow(
+        includeNextStartRowWhenError ? nextStartRowWhenError : createNextStartRowWhenError.get());
+    }
+    future.complete(
+      scan.isReversed() && Bytes.equals(scan.getStartRow(), loc.getRegionInfo().getEndKey()));
+  }
+
+  private void onError(Throwable error) {
+    error = translateException(error);
+    if (tries > startLogErrorsCnt) {
+      LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for "
+          + loc.getRegionInfo().getEncodedName() + " of " + loc.getRegionInfo().getTable()
+          + " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = "
+          + TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs()
+          + " ms",
+        error);
+    }
+    boolean scannerClosed =
+        error instanceof UnknownScannerException || error instanceof NotServingRegionException
+            || error instanceof RegionServerStoppedException;
+    RetriesExhaustedException.ThrowableWithExtraContext qt =
+        new RetriesExhaustedException.ThrowableWithExtraContext(error,
+            EnvironmentEdgeManager.currentTime(), "");
+    exceptions.add(qt);
+    if (tries >= maxAttempts) {
+      completeExceptionally(!scannerClosed);
+      return;
+    }
+    long delayNs;
+    if (scanTimeoutNs > 0) {
+      long maxDelayNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
+      if (maxDelayNs <= 0) {
+        completeExceptionally(!scannerClosed);
+        return;
+      }
+      delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
+    } else {
+      delayNs = getPauseTime(pauseNs, tries - 1);
+    }
+    if (scannerClosed) {
+      completeWhenError(false);
+      return;
+    }
+    if (error instanceof OutOfOrderScannerNextException || error instanceof ScannerResetException) {
+      completeWhenError(true);
+      return;
+    }
+    if (error instanceof DoNotRetryIOException) {
+      completeExceptionally(true);
+      return;
+    }
+    tries++;
+    retryTimer.newTimeout(t -> call(), delayNs, TimeUnit.NANOSECONDS);
+  }
+
+  private void updateNextStartRowWhenError(Result result) {
+    nextStartRowWhenError = result.getRow();
+    includeNextStartRowWhenError = result.isPartial();
+  }
+
+  private void completeWhenNoMoreResultsInRegion() {
+    if (isEmptyStopRow(scan.getStopRow())) {
+      if (isEmptyStopRow(loc.getRegionInfo().getEndKey())) {
+        completeNoMoreResults();
+      }
+    } else {
+      if (Bytes.compareTo(loc.getRegionInfo().getEndKey(), scan.getStopRow()) >= 0) {
+        completeNoMoreResults();
+      }
+    }
+    completeWithNextStartRow(loc.getRegionInfo().getEndKey());
+  }
+
+  private void completeReversedWhenNoMoreResultsInRegion() {
+    if (isEmptyStopRow(scan.getStopRow())) {
+      if (isEmptyStartRow(loc.getRegionInfo().getStartKey())) {
+        completeNoMoreResults();
+      }
+    } else {
+      if (Bytes.compareTo(loc.getRegionInfo().getStartKey(), scan.getStopRow()) <= 0) {
+        completeNoMoreResults();
+      }
+    }
+    completeWithNextStartRow(loc.getRegionInfo().getStartKey());
+  }
+
+  private void onComplete(ScanResponse resp) {
+    if (controller.failed()) {
+      onError(controller.getFailed());
+      return;
+    }
+    boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage();
+    Result[] results;
+    try {
+      results = resultCache.addAndGet(
+        Optional.ofNullable(ResponseConverter.getResults(controller.cellScanner(), resp))
+            .orElse(ScanResultCache.EMPTY_RESULT_ARRAY),
+        isHeartbeatMessage);
+    } catch (IOException e) {
+      // We can not retry here. The server has responded normally and the call sequence has been
+      // increased so a new scan with the same call sequence will cause an
+      // OutOfOrderScannerNextException. Let the upper layer open a new scanner.
+      LOG.warn("decode scan response failed", e);
+      completeWhenError(true);
+      return;
+    }
+
+    boolean stopByUser;
+    if (results.length == 0) {
+      // if we have nothing to return then this must be a heartbeat message.
+      stopByUser = !consumer.onHeartbeat();
+    } else {
+      updateNextStartRowWhenError(results[results.length - 1]);
+      stopByUser = !consumer.onNext(results);
+    }
+    if (resp.hasMoreResults() && !resp.getMoreResults()) {
+      // RS tells us there is no more data for the whole scan
+      completeNoMoreResults();
+      return;
+    }
+    if (stopByUser) {
+      if (resp.getMoreResultsInRegion()) {
+        // we have more results in region but user request to stop the scan, so we need to close the
+        // scanner explicitly.
+        closeScanner();
+      }
+      completeNoMoreResults();
+      return;
+    }
+    // as in 2.0 this value will always be set
+    if (!resp.getMoreResultsInRegion()) {
+      completeWhenNoMoreResultsInRegion.run();
+      return;
+    }
+    next();
+  }
+
+  private void call() {
+    resetController(controller, rpcTimeoutNs);
+    ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false,
+      nextCallSeq, false, false);
+    stub.scan(controller, req, this::onComplete);
+  }
+
+  private void next() {
+    nextCallSeq++;
+    tries = 0;
+    exceptions.clear();
+    nextCallStartNs = System.nanoTime();
+    call();
+  }
+
+  /**
+   * @return return locate direction for next open scanner call, or null if we should stop.
+   */
+  public CompletableFuture<Boolean> start() {
+    next();
+    return future;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a6d6aa2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
index f10c9a5..36687c6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
@@ -18,14 +18,13 @@
 package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
 
 import io.netty.util.HashedWheelTimer;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
 
 import java.io.IOException;
-import java.lang.reflect.UndeclaredThrowableException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -40,11 +39,9 @@ import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.ipc.RemoteException;
 
 /**
  * Retry caller for a single request, such as get, put, delete, etc.
@@ -121,19 +118,6 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
     return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
   }
 
-  private static Throwable translateException(Throwable t) {
-    if (t instanceof UndeclaredThrowableException && t.getCause() != null) {
-      t = t.getCause();
-    }
-    if (t instanceof RemoteException) {
-      t = ((RemoteException) t).unwrapRemoteException();
-    }
-    if (t instanceof ServiceException && t.getCause() != null) {
-      t = translateException(t.getCause());
-    }
-    return t;
-  }
-
   private void completeExceptionally() {
     future.completeExceptionally(new RetriesExhaustedException(tries, exceptions));
   }
@@ -165,22 +149,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
     }
     updateCachedLocation.accept(error);
     tries++;
-    retryTimer.newTimeout(new TimerTask() {
-
-      @Override
-      public void run(Timeout timeout) throws Exception {
-        // always restart from beginning.
-        locateThenCall();
-      }
-    }, delayNs, TimeUnit.NANOSECONDS);
-  }
-
-  private void resetController() {
-    controller.reset();
-    if (rpcTimeoutNs >= 0) {
-      controller.setCallTimeout(
-        (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(rpcTimeoutNs)));
-    }
+    retryTimer.newTimeout(t -> locateThenCall(), delayNs, TimeUnit.NANOSECONDS);
   }
 
   private void call(HRegionLocation loc) {
@@ -197,7 +166,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
         err -> conn.getLocator().updateCachedLocation(loc, err));
       return;
     }
-    resetController();
+    resetController(controller, rpcTimeoutNs);
     callable.call(controller, loc, stub).whenComplete((result, error) -> {
       if (error != null) {
         onError(error,

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a6d6aa2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
index 94747b9..851fbf1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
@@ -100,6 +100,21 @@ public interface AsyncTable {
   long getOperationTimeout(TimeUnit unit);
 
   /**
+   * Set timeout of a single operation in a scan, such as openScanner and next. Will override the
+   * value {@code hbase.client.scanner.timeout.period} in configuration.
+   * <p>
+   * Generally a scan will never timeout after we add heartbeat support unless the region is
+   * crashed. The {@code scanTimeout} works like the {@code operationTimeout} for each single
+   * operation in a scan.
+   */
+  void setScanTimeout(long timeout, TimeUnit unit);
+
+  /**
+   * Get the timeout of a single operation in a scan.
+   */
+  long getScanTimeout(TimeUnit unit);
+
+  /**
    * Test for the existence of columns in the table, as specified by the Get.
    * <p>
    * This will return true if the Get matches one or more keys, false if not.
@@ -335,4 +350,26 @@ public interface AsyncTable {
    *         {@link CompletableFuture}.
    */
   CompletableFuture<List<Result>> smallScan(Scan scan, int limit);
+
+  /**
+   * The basic scan API uses the observer pattern. All results that match the given scan object will
+   * be passed to the given {@code consumer} by calling {@link ScanResultConsumer#onNext(Result[])}.
+   * {@link ScanResultConsumer#onComplete()} means the scan is finished, and
+   * {@link ScanResultConsumer#onError(Throwable)} means we hit an unrecoverable error and the scan
+   * is terminated. {@link ScanResultConsumer#onHeartbeat()} means the RS is still working but we
+   * can not get a valid result to call {@link ScanResultConsumer#onNext(Result[])}. This is usually
+   * because the matched results are too sparse, for example, a filter which almost filters out
+   * everything is specified.
+   * <p>
+   * Notice that, the methods of the given {@code consumer} will be called directly in the rpc
+   * framework's callback thread, so typically you should not do any time consuming work inside
+   * these methods, otherwise you will be likely to block at least one connection to RS(even more if
+   * the rpc framework uses NIO).
+   * <p>
+   * This method is only for experts, do <strong>NOT</strong> use this method if you have other
+   * choice.
+   * @param scan A configured {@link Scan} object.
+   * @param consumer the consumer used to receive results.
+   */
+  void scan(Scan scan, ScanResultConsumer consumer);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a6d6aa2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
index ce53775..c5afceb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
@@ -25,6 +25,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
@@ -55,6 +57,8 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
 @InterfaceAudience.Private
 class AsyncTableImpl implements AsyncTable {
 
+  private static final Log LOG = LogFactory.getLog(AsyncTableImpl.class);
+
   private final AsyncConnectionImpl conn;
 
   private final TableName tableName;
@@ -348,6 +352,20 @@ class AsyncTableImpl implements AsyncTable {
         .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call();
   }
 
+  public void scan(Scan scan, ScanResultConsumer consumer) {
+    if (scan.isSmall()) {
+      if (scan.getBatch() > 0 || scan.getAllowPartialResults()) {
+        consumer.onError(
+          new IllegalArgumentException("Batch and allowPartial is not allowed for small scan"));
+      } else {
+        LOG.warn("This is small scan " + scan + ", consider using smallScan directly?");
+      }
+    }
+    scan = setDefaultScanConfig(scan);
+    new AsyncClientScanner(scan, consumer, tableName, conn, scanTimeoutNs, readRpcTimeoutNs)
+        .start();
+  }
+
   @Override
   public void setReadRpcTimeout(long timeout, TimeUnit unit) {
     this.readRpcTimeoutNs = unit.toNanos(timeout);
@@ -377,4 +395,14 @@ class AsyncTableImpl implements AsyncTable {
   public long getOperationTimeout(TimeUnit unit) {
     return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS);
   }
+
+  @Override
+  public void setScanTimeout(long timeout, TimeUnit unit) {
+    this.scanTimeoutNs = unit.toNanos(timeout);
+  }
+
+  @Override
+  public long getScanTimeout(TimeUnit unit) {
+    return TimeUnit.NANOSECONDS.convert(scanTimeoutNs, unit);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a6d6aa2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 00ff350..b7bdb83 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.util.Bytes;
  */
 @InterfaceAudience.Private
 public abstract class ClientScanner extends AbstractClientScanner {
+
   private static final Log LOG = LogFactory.getLog(ClientScanner.class);
 
   protected Scan scan;

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a6d6aa2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistryFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistryFactory.java
deleted file mode 100644
index 48bfb18..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistryFactory.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
-
-/**
- * Get instance of configured Registry.
- */
-@InterfaceAudience.Private
-final class ClusterRegistryFactory {
-
-  static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl";
-
-  private ClusterRegistryFactory() {
-  }
-
-  /**
-   * @return The cluster registry implementation to use.
-   */
-  static AsyncRegistry getRegistry(Configuration conf) {
-    Class<? extends AsyncRegistry> clazz =
-        conf.getClass(REGISTRY_IMPL_CONF_KEY, ZKAsyncRegistry.class, AsyncRegistry.class);
-    return ReflectionUtils.newInstance(clazz, conf);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a6d6aa2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java
new file mode 100644
index 0000000..538aecb
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * A scan result cache that only returns complete result.
+ */
+@InterfaceAudience.Private
+class CompleteScanResultCache implements ScanResultCache {
+
+  private final List<Result> partialResults = new ArrayList<>();
+
+  private Result combine() throws IOException {
+    Result result = Result.createCompleteResult(partialResults);
+    partialResults.clear();
+    return result;
+  }
+
+  private Result[] prependCombined(Result[] results, int length) throws IOException {
+    Result[] prependResults = new Result[length + 1];
+    prependResults[0] = combine();
+    System.arraycopy(results, 0, prependResults, 1, length);
+    return prependResults;
+  }
+
+  @Override
+  public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException {
+    // If no results were returned it indicates that either we have the all the partial results
+    // necessary to construct the complete result or the server had to send a heartbeat message
+    // to the client to keep the client-server connection alive
+    if (results.length == 0) {
+      // If this response was an empty heartbeat message, then we have not exhausted the region
+      // and thus there may be more partials server side that still need to be added to the partial
+      // list before we form the complete Result
+      if (!partialResults.isEmpty() && !isHeartbeatMessage) {
+        return new Result[] { combine() };
+      }
+      return EMPTY_RESULT_ARRAY;
+    }
+    // In every RPC response there should be at most a single partial result. Furthermore, if
+    // there is a partial result, it is guaranteed to be in the last position of the array.
+    Result last = results[results.length - 1];
+    if (last.isPartial()) {
+      if (partialResults.isEmpty()) {
+        partialResults.add(last);
+        return Arrays.copyOf(results, results.length - 1);
+      }
+      // We have only one result and it is partial
+      if (results.length == 1) {
+        // check if there is a row change
+        if (Bytes.equals(partialResults.get(0).getRow(), last.getRow())) {
+          partialResults.add(last);
+          return EMPTY_RESULT_ARRAY;
+        }
+        Result completeResult = combine();
+        partialResults.add(last);
+        return new Result[] { completeResult };
+      }
+      // We have some complete results
+      Result[] resultsToReturn = prependCombined(results, results.length - 1);
+      partialResults.add(last);
+      return resultsToReturn;
+    }
+    if (!partialResults.isEmpty()) {
+      return prependCombined(results, results.length);
+    }
+    return results;
+  }
+
+  @Override
+  public void clear() {
+    partialResults.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a6d6aa2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 729f874..d464c3b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -24,11 +24,13 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Arrays;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,11 +39,14 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.ipc.RemoteException;
 
 /**
  * Utility used by client connections.
@@ -264,4 +269,25 @@ public final class ConnectionUtils {
   static boolean isEmptyStopRow(byte[] row) {
     return Bytes.equals(row, EMPTY_END_ROW);
   }
+
+  static void resetController(HBaseRpcController controller, long timeoutNs) {
+    controller.reset();
+    if (timeoutNs >= 0) {
+      controller.setCallTimeout(
+        (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(timeoutNs)));
+    }
+  }
+
+  static Throwable translateException(Throwable t) {
+    if (t instanceof UndeclaredThrowableException && t.getCause() != null) {
+      t = t.getCause();
+    }
+    if (t instanceof RemoteException) {
+      t = ((RemoteException) t).unwrapRemoteException();
+    }
+    if (t instanceof ServiceException && t.getCause() != null) {
+      t = translateException(t.getCause());
+    }
+    return t;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a6d6aa2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java
new file mode 100644
index 0000000..2366b57
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Used to separate the row constructing logic.
+ * <p>
+ * After we add heartbeat support for scan, RS may return partial result even if allowPartial is
+ * false and batch is 0. With this interface, the implementation now looks like:
+ * <ol>
+ * <li>Get results from ScanResponse proto.</li>
+ * <li>Pass them to ScanResultCache and get something back.</li>
+ * <li>If we actually get something back, then pass it to ScanObserver.</li>
+ * </ol>
+ */
+@InterfaceAudience.Private
+interface ScanResultCache {
+
+  static final Result[] EMPTY_RESULT_ARRAY = new Result[0];
+
+  /**
+   * Add the given results to cache and get valid results back.
+   * @param results the results of a scan next. Must not be null.
+   * @param isHeartbeatMessage indicate whether the results is gotten from a heartbeat response.
+   * @return valid results, never null.
+   */
+  Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException;
+
+  /**
+   * Clear the cached result if any. Called when scan error and we will start from a start of a row
+   * again.
+   */
+  void clear();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a6d6aa2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java
new file mode 100644
index 0000000..772a2fb
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Result;
+
+/**
+ * Receives {@link Result} from an asynchronous scanner.
+ * <p>
+ * Notice that, the {@link #onNext(Result[])} method will be called in the thread which we send
+ * request to HBase service. So if you want the asynchronous scanner fetch data from HBase in
+ * background while you process the returned data, you need to move the processing work to another
+ * thread to make the {@code onNext} call return immediately. And please do NOT do any time
+ * consuming tasks in all methods below unless you know what you are doing.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface ScanResultConsumer {
+
+  /**
+   * @param results the data fetched from HBase service.
+   * @return {@code false} if you want to stop the scanner process. Otherwise {@code true}
+   */
+  boolean onNext(Result[] results);
+
+  /**
+   * Indicate that there is an heartbeat message but we have not cumulated enough cells to call
+   * onNext.
+   * <p>
+   * This method give you a chance to terminate a slow scan operation.
+   * @return {@code false} if you want to stop the scanner process. Otherwise {@code true}
+   */
+  boolean onHeartbeat();
+
+  /**
+   * Indicate that we hit an unrecoverable error and the scan operation is terminated.
+   * <p>
+   * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}.
+   */
+  void onError(Throwable error);
+
+  /**
+   * Indicate that the scan operation is completed normally.
+   */
+  void onComplete();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a6d6aa2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
new file mode 100644
index 0000000..a0792ef
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public abstract class AbstractTestAsyncTableScan {
+
+  protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  protected static TableName TABLE_NAME = TableName.valueOf("async");
+
+  protected static byte[] FAMILY = Bytes.toBytes("cf");
+
+  protected static byte[] CQ1 = Bytes.toBytes("cq1");
+
+  protected static byte[] CQ2 = Bytes.toBytes("cq2");
+
+  protected static int COUNT = 1000;
+
+  protected static AsyncConnection ASYNC_CONN;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    TEST_UTIL.startMiniCluster(3);
+    byte[][] splitKeys = new byte[8][];
+    for (int i = 111; i < 999; i += 111) {
+      splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
+    }
+    TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys);
+    TEST_UTIL.waitTableAvailable(TABLE_NAME);
+    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
+    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
+    List<CompletableFuture<?>> futures = new ArrayList<>();
+    IntStream.range(0, COUNT).forEach(
+      i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i)))
+          .addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i)))));
+    CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).get();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    ASYNC_CONN.close();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  protected abstract Scan createScan();
+
+  protected abstract List<Result> doScan(AsyncTable table, Scan scan) throws Exception;
+
+  @Test
+  public void testScanAll() throws Exception {
+    List<Result> results = doScan(ASYNC_CONN.getTable(TABLE_NAME), createScan());
+    assertEquals(COUNT, results.size());
+    IntStream.range(0, COUNT).forEach(i -> {
+      Result result = results.get(i);
+      assertEquals(String.format("%03d", i), Bytes.toString(result.getRow()));
+      assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ1)));
+    });
+  }
+
+  private void assertResultEquals(Result result, int i) {
+    assertEquals(String.format("%03d", i), Bytes.toString(result.getRow()));
+    assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ1)));
+    assertEquals(i * i, Bytes.toInt(result.getValue(FAMILY, CQ2)));
+  }
+
+  @Test
+  public void testReversedScanAll() throws Exception {
+    List<Result> results = doScan(ASYNC_CONN.getTable(TABLE_NAME), createScan().setReversed(true));
+    assertEquals(COUNT, results.size());
+    IntStream.range(0, COUNT).forEach(i -> assertResultEquals(results.get(i), COUNT - i - 1));
+  }
+
+  @Test
+  public void testScanNoStopKey() throws Exception {
+    int start = 345;
+    List<Result> results = doScan(ASYNC_CONN.getTable(TABLE_NAME),
+      createScan().setStartRow(Bytes.toBytes(String.format("%03d", start))));
+    assertEquals(COUNT - start, results.size());
+    IntStream.range(0, COUNT - start).forEach(i -> assertResultEquals(results.get(i), start + i));
+  }
+
+  @Test
+  public void testReverseScanNoStopKey() throws Exception {
+    int start = 765;
+    List<Result> results = doScan(ASYNC_CONN.getTable(TABLE_NAME),
+      createScan().setStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true));
+    assertEquals(start + 1, results.size());
+    IntStream.range(0, start + 1).forEach(i -> assertResultEquals(results.get(i), start - i));
+  }
+
+  private void testScan(int start, int stop) throws Exception {
+    List<Result> results = doScan(ASYNC_CONN.getTable(TABLE_NAME),
+      createScan().setStartRow(Bytes.toBytes(String.format("%03d", start)))
+          .setStopRow(Bytes.toBytes(String.format("%03d", stop))));
+    assertEquals(stop - start, results.size());
+    IntStream.range(0, stop - start).forEach(i -> assertResultEquals(results.get(i), start + i));
+  }
+
+  private void testReversedScan(int start, int stop) throws Exception {
+    List<Result> results = doScan(ASYNC_CONN.getTable(TABLE_NAME),
+      createScan().setStartRow(Bytes.toBytes(String.format("%03d", start)))
+          .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setReversed(true));
+    assertEquals(start - stop, results.size());
+    IntStream.range(0, start - stop).forEach(i -> assertResultEquals(results.get(i), start - i));
+  }
+
+  @Test
+  public void testScanWithStartKeyAndStopKey() throws Exception {
+    testScan(345, 567);
+  }
+
+  @Test
+  public void testReversedScanWithStartKeyAndStopKey() throws Exception {
+    testReversedScan(765, 543);
+  }
+
+  @Test
+  public void testScanAtRegionBoundary() throws Exception {
+    testScan(222, 333);
+  }
+
+  @Test
+  public void testReversedScanAtRegionBoundary() throws Exception {
+    testScan(222, 333);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a6d6aa2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java
new file mode 100644
index 0000000..fc5ba14
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ SmallTests.class, ClientTests.class })
+public class TestAllowPartialScanResultCache {
+
+  private static byte[] CF = Bytes.toBytes("cf");
+
+  private AllowPartialScanResultCache resultCache;
+
+  @Before
+  public void setUp() {
+    resultCache = new AllowPartialScanResultCache();
+  }
+
+  @After
+  public void tearDown() {
+    resultCache.clear();
+    resultCache = null;
+  }
+
+  private static Cell createCell(int key, int cq) {
+    return new KeyValue(Bytes.toBytes(key), CF, Bytes.toBytes("cq" + cq), Bytes.toBytes(key));
+  }
+
+  @Test
+  public void test() throws IOException {
+    assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
+      resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false));
+    assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
+      resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true));
+
+    Cell[] cells1 = IntStream.range(0, 10).mapToObj(i -> createCell(1, i)).toArray(Cell[]::new);
+    Cell[] cells2 = IntStream.range(0, 10).mapToObj(i -> createCell(2, i)).toArray(Cell[]::new);
+
+    Result[] results1 = resultCache.addAndGet(
+      new Result[] { Result.create(Arrays.copyOf(cells1, 5), null, false, true) }, false);
+    assertEquals(1, results1.length);
+    assertEquals(1, Bytes.toInt(results1[0].getRow()));
+    assertEquals(5, results1[0].rawCells().length);
+    IntStream.range(0, 5).forEach(
+      i -> assertEquals(1, Bytes.toInt(results1[0].getValue(CF, Bytes.toBytes("cq" + i)))));
+
+    Result[] results2 = resultCache.addAndGet(
+      new Result[] { Result.create(Arrays.copyOfRange(cells1, 1, 10), null, false, true) }, false);
+    assertEquals(1, results2.length);
+    assertEquals(1, Bytes.toInt(results2[0].getRow()));
+    assertEquals(5, results2[0].rawCells().length);
+    IntStream.range(5, 10).forEach(
+      i -> assertEquals(1, Bytes.toInt(results2[0].getValue(CF, Bytes.toBytes("cq" + i)))));
+
+    Result[] results3 = resultCache
+        .addAndGet(new Result[] { Result.create(cells1), Result.create(cells2) }, false);
+    assertEquals(1, results3.length);
+    assertEquals(2, Bytes.toInt(results3[0].getRow()));
+    assertEquals(10, results3[0].rawCells().length);
+    IntStream.range(0, 10).forEach(
+      i -> assertEquals(2, Bytes.toInt(results3[0].getValue(CF, Bytes.toBytes("cq" + i)))));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a6d6aa2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java
new file mode 100644
index 0000000..d21560f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.base.Throwables;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Queue;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
+
+  private static final class SimpleScanResultConsumer implements ScanResultConsumer {
+
+    private final Queue<Result> queue = new ArrayDeque<>();
+
+    private boolean finished;
+
+    private Throwable error;
+
+    @Override
+    public synchronized boolean onNext(Result[] results) {
+      for (Result result : results) {
+        queue.offer(result);
+      }
+      notifyAll();
+      return true;
+    }
+
+    @Override
+    public boolean onHeartbeat() {
+      return true;
+    }
+
+    @Override
+    public synchronized void onError(Throwable error) {
+      finished = true;
+      this.error = error;
+      notifyAll();
+    }
+
+    @Override
+    public synchronized void onComplete() {
+      finished = true;
+      notifyAll();
+    }
+
+    public synchronized Result take() throws IOException, InterruptedException {
+      for (;;) {
+        if (!queue.isEmpty()) {
+          return queue.poll();
+        }
+        if (finished) {
+          if (error != null) {
+            Throwables.propagateIfPossible(error, IOException.class);
+            throw new IOException(error);
+          } else {
+            return null;
+          }
+        }
+        wait();
+      }
+    }
+  }
+
+  @Parameter
+  public Supplier<Scan> scanCreater;
+
+  @Parameters
+  public static List<Object[]> params() {
+    return Arrays.asList(new Supplier<?>[] { TestAsyncTableScan::createNormalScan },
+      new Supplier<?>[] { TestAsyncTableScan::createBatchScan });
+  }
+
+  private static Scan createNormalScan() {
+    return new Scan();
+  }
+
+  private static Scan createBatchScan() {
+    return new Scan().setBatch(1);
+  }
+
+  @Override
+  protected Scan createScan() {
+    return scanCreater.get();
+  }
+
+  private Result convertToPartial(Result result) {
+    return Result.create(result.rawCells(), result.getExists(), result.isStale(), true);
+  }
+
+  @Override
+  protected List<Result> doScan(AsyncTable table, Scan scan) throws Exception {
+    SimpleScanResultConsumer scanObserver = new SimpleScanResultConsumer();
+    table.scan(scan, scanObserver);
+    List<Result> results = new ArrayList<>();
+    for (Result result; (result = scanObserver.take()) != null;) {
+      results.add(result);
+    }
+    if (scan.getBatch() > 0) {
+      assertTrue(results.size() % 2 == 0);
+      return IntStream.range(0, results.size() / 2).mapToObj(i -> {
+        try {
+          return Result.createCompleteResult(Arrays.asList(convertToPartial(results.get(2 * i)),
+            convertToPartial(results.get(2 * i + 1))));
+        } catch (IOException e) {
+          throw new UncheckedIOException(e);
+        }
+      }).collect(Collectors.toList());
+    }
+    return results;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a6d6aa2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java
index 972780e..e920013 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java
@@ -19,166 +19,18 @@ package org.apache.hadoop.hbase.client;
 
 import static org.junit.Assert.assertEquals;
 
-import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.IntStream;
 
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 @Category({ MediumTests.class, ClientTests.class })
-public class TestAsyncTableSmallScan {
-
-  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
-  private static TableName TABLE_NAME = TableName.valueOf("async");
-
-  private static byte[] FAMILY = Bytes.toBytes("cf");
-
-  private static byte[] QUALIFIER = Bytes.toBytes("cq");
-
-  private static int COUNT = 1000;
-
-  private static AsyncConnection ASYNC_CONN;
-
-  @BeforeClass
-  public static void setUp() throws Exception {
-    TEST_UTIL.startMiniCluster(3);
-    byte[][] splitKeys = new byte[8][];
-    for (int i = 111; i < 999; i += 111) {
-      splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
-    }
-    TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys);
-    TEST_UTIL.waitTableAvailable(TABLE_NAME);
-    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
-    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
-    List<CompletableFuture<?>> futures = new ArrayList<>();
-    IntStream.range(0, COUNT)
-        .forEach(i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i)))
-            .addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)))));
-    CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).get();
-  }
-
-  @AfterClass
-  public static void tearDown() throws Exception {
-    ASYNC_CONN.close();
-    TEST_UTIL.shutdownMiniCluster();
-  }
-
-  @Test
-  public void testScanAll() throws InterruptedException, ExecutionException {
-    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
-    List<Result> results = table.smallScan(new Scan().setSmall(true)).get();
-    assertEquals(COUNT, results.size());
-    IntStream.range(0, COUNT).forEach(i -> {
-      Result result = results.get(i);
-      assertEquals(String.format("%03d", i), Bytes.toString(result.getRow()));
-      assertEquals(i, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
-    });
-  }
-
-  @Test
-  public void testReversedScanAll() throws InterruptedException, ExecutionException {
-    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
-    List<Result> results = table.smallScan(new Scan().setSmall(true).setReversed(true)).get();
-    assertEquals(COUNT, results.size());
-    IntStream.range(0, COUNT).forEach(i -> {
-      Result result = results.get(i);
-      int actualIndex = COUNT - i - 1;
-      assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
-      assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
-    });
-  }
-
-  @Test
-  public void testScanNoStopKey() throws InterruptedException, ExecutionException {
-    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
-    int start = 345;
-    List<Result> results = table
-        .smallScan(new Scan(Bytes.toBytes(String.format("%03d", start))).setSmall(true)).get();
-    assertEquals(COUNT - start, results.size());
-    IntStream.range(0, COUNT - start).forEach(i -> {
-      Result result = results.get(i);
-      int actualIndex = start + i;
-      assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
-      assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
-    });
-  }
-
-  @Test
-  public void testReverseScanNoStopKey() throws InterruptedException, ExecutionException {
-    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
-    int start = 765;
-    List<Result> results = table
-        .smallScan(
-          new Scan(Bytes.toBytes(String.format("%03d", start))).setSmall(true).setReversed(true))
-        .get();
-    assertEquals(start + 1, results.size());
-    IntStream.range(0, start + 1).forEach(i -> {
-      Result result = results.get(i);
-      int actualIndex = start - i;
-      assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
-      assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
-    });
-  }
-
-  private void testScan(int start, int stop) throws InterruptedException, ExecutionException {
-    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
-    List<Result> results = table.smallScan(new Scan(Bytes.toBytes(String.format("%03d", start)))
-        .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true)).get();
-    assertEquals(stop - start, results.size());
-    IntStream.range(0, stop - start).forEach(i -> {
-      Result result = results.get(i);
-      int actualIndex = start + i;
-      assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
-      assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
-    });
-  }
-
-  private void testReversedScan(int start, int stop)
-      throws InterruptedException, ExecutionException {
-    AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
-    List<Result> results = table.smallScan(new Scan(Bytes.toBytes(String.format("%03d", start)))
-        .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true).setReversed(true))
-        .get();
-    assertEquals(start - stop, results.size());
-    IntStream.range(0, start - stop).forEach(i -> {
-      Result result = results.get(i);
-      int actualIndex = start - i;
-      assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
-      assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
-    });
-  }
-
-  @Test
-  public void testScanWithStartKeyAndStopKey() throws InterruptedException, ExecutionException {
-    testScan(345, 567);
-  }
-
-  @Test
-  public void testReversedScanWithStartKeyAndStopKey()
-      throws InterruptedException, ExecutionException {
-    testReversedScan(765, 543);
-  }
-
-  @Test
-  public void testScanAtRegionBoundary() throws InterruptedException, ExecutionException {
-    testScan(222, 333);
-  }
-
-  @Test
-  public void testReversedScanAtRegionBoundary() throws InterruptedException, ExecutionException {
-    testScan(222, 333);
-  }
+public class TestAsyncTableSmallScan extends AbstractTestAsyncTableScan {
 
   @Test
   public void testScanWithLimit() throws InterruptedException, ExecutionException {
@@ -194,7 +46,7 @@ public class TestAsyncTableSmallScan {
       Result result = results.get(i);
       int actualIndex = start + i;
       assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
-      assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
+      assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, CQ1)));
     });
   }
 
@@ -213,7 +65,17 @@ public class TestAsyncTableSmallScan {
       Result result = results.get(i);
       int actualIndex = start - i;
       assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
-      assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
+      assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, CQ1)));
     });
   }
+
+  @Override
+  protected Scan createScan() {
+    return new Scan().setSmall(true);
+  }
+
+  @Override
+  protected List<Result> doScan(AsyncTable table, Scan scan) throws Exception {
+    return table.smallScan(scan).get();
+  }
 }