You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/03/17 01:33:22 UTC

hbase git commit: HBASE-17793 Backport ScanResultCache related code to branch-1

Repository: hbase
Updated Branches:
  refs/heads/branch-1 d542b446b -> 6be8d2041


HBASE-17793 Backport ScanResultCache related code to branch-1


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6be8d204
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6be8d204
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6be8d204

Branch: refs/heads/branch-1
Commit: 6be8d2041b9dfb99c77d824776fb39da8984bea1
Parents: d542b44
Author: zhangduo <zh...@apache.org>
Authored: Thu Mar 16 15:25:56 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Fri Mar 17 09:32:44 2017 +0800

----------------------------------------------------------------------
 .../client/AllowPartialScanResultCache.java     |  73 ++++++
 .../hbase/client/BatchScanResultCache.java      | 142 +++++++++++
 .../hadoop/hbase/client/ClientScanner.java      | 254 +------------------
 .../hbase/client/CompleteScanResultCache.java   | 110 ++++++++
 .../hadoop/hbase/client/ConnectionUtils.java    |  48 ++++
 .../org/apache/hadoop/hbase/client/Result.java  |  69 +++--
 .../hadoop/hbase/client/ScanResultCache.java    |  53 ++++
 .../client/TestAllowPartialScanResultCache.java |  91 +++++++
 .../hbase/client/TestBatchScanResultCache.java  | 113 +++++++++
 .../TestCompleteResultScanResultCache.java      | 182 +++++++++++++
 10 files changed, 858 insertions(+), 277 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6be8d204/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
new file mode 100644
index 0000000..82f1ea0
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A ScanResultCache that may return partial result.
+ * <p>
+ * As we can only scan from the starting of a row when error, so here we also implement the logic
+ * that skips the cells that have already been returned.
+ */
+@InterfaceAudience.Private
+class AllowPartialScanResultCache implements ScanResultCache {
+
+  // used to filter out the cells that already returned to user as we always start from the
+  // beginning of a row when retry.
+  private Cell lastCell;
+
+  private void updateLastCell(Result result) {
+    lastCell = result.rawCells()[result.rawCells().length - 1];
+  }
+
+  @Override
+  public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException {
+    if (results.length == 0) {
+      return EMPTY_RESULT_ARRAY;
+    }
+    int i;
+    for (i = 0; i < results.length; i++) {
+      Result r = filterCells(results[i], lastCell);
+      if (r != null) {
+        results[i] = r;
+        break;
+      }
+    }
+    if (i == results.length) {
+      return EMPTY_RESULT_ARRAY;
+    }
+    updateLastCell(results[results.length - 1]);
+    if (i > 0) {
+      return Arrays.copyOfRange(results, i, results.length);
+    } else {
+      return results;
+    }
+  }
+
+  @Override
+  public void clear() {
+    // we do not cache anything
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/6be8d204/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java
new file mode 100644
index 0000000..9ab959b
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * A scan result cache for batched scan, i.e,
+ * {@code scan.getBatch() > 0 && !scan.getAllowPartialResults()}.
+ * <p>
+ * If user setBatch(5) and rpc returns 3+5+5+5+3 cells, we should return 5+5+5+5+1 to user. setBatch
+ * doesn't mean setAllowPartialResult(true).
+ */
+@InterfaceAudience.Private
+public class BatchScanResultCache implements ScanResultCache {
+
+  private final int batch;
+
+  // used to filter out the cells that already returned to user as we always start from the
+  // beginning of a row when retry.
+  private Cell lastCell;
+
+  private final Deque<Result> partialResults = new ArrayDeque<>();
+
+  private int numCellsOfPartialResults;
+
+  public BatchScanResultCache(int batch) {
+    this.batch = batch;
+  }
+
+  private void updateLastCell(Result result) {
+    lastCell = result.rawCells()[result.rawCells().length - 1];
+  }
+
+  private Result createCompletedResult() throws IOException {
+    Result result = Result.createCompleteResult(partialResults);
+    partialResults.clear();
+    numCellsOfPartialResults = 0;
+    return result;
+  }
+
+  // Add new result to the partial list and return a batched Result if caching size exceed batching
+  // limit. As the RS will also respect the scan.getBatch, we can make sure that we will get only
+  // one Result back at most(or null, which means we do not have enough cells).
+  private Result regroupResults(Result result) {
+    partialResults.addLast(result);
+    numCellsOfPartialResults += result.size();
+    if (numCellsOfPartialResults < batch) {
+      return null;
+    }
+    Cell[] cells = new Cell[batch];
+    int cellCount = 0;
+    boolean stale = false;
+    for (;;) {
+      Result r = partialResults.pollFirst();
+      stale = stale || r.isStale();
+      int newCellCount = cellCount + r.size();
+      if (newCellCount > batch) {
+        // We have more cells than expected, so split the current result
+        int len = batch - cellCount;
+        System.arraycopy(r.rawCells(), 0, cells, cellCount, len);
+        Cell[] remainingCells = new Cell[r.size() - len];
+        System.arraycopy(r.rawCells(), len, remainingCells, 0, r.size() - len);
+        partialResults.addFirst(
+          Result.create(remainingCells, r.getExists(), r.isStale(), r.mayHaveMoreCellsInRow()));
+        break;
+      }
+      System.arraycopy(r.rawCells(), 0, cells, cellCount, r.size());
+      if (newCellCount == batch) {
+        break;
+      }
+      cellCount = newCellCount;
+    }
+    numCellsOfPartialResults -= batch;
+    return Result.create(cells, null, stale,
+      result.mayHaveMoreCellsInRow() || !partialResults.isEmpty());
+  }
+
+  @Override
+  public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException {
+    if (results.length == 0) {
+      if (!partialResults.isEmpty() && !isHeartbeatMessage) {
+        return new Result[] { createCompletedResult() };
+      }
+      return EMPTY_RESULT_ARRAY;
+    }
+    List<Result> regroupedResults = new ArrayList<>();
+    for (Result result : results) {
+      result = filterCells(result, lastCell);
+      if (result == null) {
+        continue;
+      }
+      // check if we have a row change
+      if (!partialResults.isEmpty() &&
+          !Bytes.equals(partialResults.peek().getRow(), result.getRow())) {
+        regroupedResults.add(createCompletedResult());
+      }
+      Result regroupedResult = regroupResults(result);
+      if (regroupedResult != null) {
+        regroupedResults.add(regroupedResult);
+        // only update last cell when we actually return it to user.
+        updateLastCell(regroupedResult);
+      }
+      if (!result.mayHaveMoreCellsInRow() && !partialResults.isEmpty()) {
+        // We are done for this row
+        regroupedResults.add(createCompletedResult());
+      }
+    }
+    return regroupedResults.toArray(new Result[0]);
+  }
+
+  @Override
+  public void clear() {
+    partialResults.clear();
+    numCellsOfPartialResults = 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/6be8d204/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 01ea993..57586d8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -17,16 +17,15 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.numberOfIndividualRows;
 
 import com.google.common.annotations.VisibleForTesting;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.commons.lang.mutable.MutableBoolean;
@@ -34,13 +33,11 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.KeyValue.MetaComparator;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.UnknownScannerException;
@@ -69,23 +66,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
   protected HRegionInfo currentRegion = null;
   protected ScannerCallableWithReplicas callable = null;
   protected final LinkedList<Result> cache = new LinkedList<Result>();
-  /**
-   * A list of partial results that have been returned from the server. This list should only
-   * contain results if this scanner does not have enough partial results to form the complete
-   * result.
-   */
-  protected final LinkedList<Result> partialResults = new LinkedList<Result>();
-  protected int partialResultsCellSizes = 0;
-  /**
-   * The row for which we are accumulating partial Results (i.e. the row of the Results stored
-   * inside partialResults). Changes to partialResultsRow and partialResults are kept in sync via
-   * the methods {@link #regroupResults(Result)} and {@link #clearPartialResults()}
-   */
-  protected byte[] partialResultsRow = null;
-  /**
-   * The last cell from a not full Row which is added to cache
-   */
-  protected Cell lastCellLoadedToCache = null;
+  private final ScanResultCache scanResultCache;
   protected final int caching;
   protected long lastNext;
   // Keep lastResult returned successfully in case we have to reset scanner.
@@ -106,7 +87,6 @@ public abstract class ClientScanner extends AbstractClientScanner {
   protected final int primaryOperationTimeout;
   private int retries;
   protected final ExecutorService pool;
-  private static MetaComparator metaComparator = new MetaComparator();
 
   /**
    * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
@@ -159,6 +139,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
     this.rpcControllerFactory = controllerFactory;
 
     this.conf = conf;
+    this.scanResultCache = createScanResultCache(scan);
   }
 
   protected ClusterConnection getConnection() {
@@ -361,14 +342,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
 
   private void closeScannerIfExhausted(boolean exhausted) throws IOException {
     if (exhausted) {
-      if (!partialResults.isEmpty()) {
-        // XXX: continue if there are partial results. But in fact server should not set
-        // hasMoreResults to false if there are partial results.
-        LOG.warn("Server tells us there is no more results for this region but we still have" +
-            " partialResults, this should not happen, retry on the current scanner anyway");
-      } else {
-        closeScanner();
-      }
+      closeScanner();
     }
   }
 
@@ -376,7 +350,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
       MutableBoolean retryAfterOutOfOrderException, int retriesLeft) throws DoNotRetryIOException {
     // An exception was thrown which makes any partial results that we were collecting
     // invalid. The scanner will need to be reset to the beginning of a row.
-    clearPartialResults();
+    scanResultCache.clear();
 
     // Unfortunately, DNRIOE is used in two different semantics.
     // (1) The first is to close the client scanner and bubble up the exception all the way
@@ -470,7 +444,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
         if (callable.switchedToADifferentReplica()) {
           // Any accumulated partial results are no longer valid since the callable will
           // openScanner with the correct startkey and we must pick up from there
-          clearPartialResults();
+          scanResultCache.clear();
           this.currentRegion = callable.getHRegionInfo();
         }
         retryAfterOutOfOrderException.setValue(true);
@@ -490,28 +464,20 @@ public abstract class ClientScanner extends AbstractClientScanner {
       // Groom the array of Results that we received back from the server before adding that
       // Results to the scanner's cache. If partial results are not allowed to be seen by the
       // caller, all book keeping will be performed within this method.
-      List<Result> resultsToAddToCache =
-          getResultsToAddToCache(values, callable.isHeartbeatMessage());
-      if (!resultsToAddToCache.isEmpty()) {
+      Result[] resultsToAddToCache =
+          scanResultCache.addAndGet(values, callable.isHeartbeatMessage());
+      if (resultsToAddToCache.length > 0) {
         for (Result rs : resultsToAddToCache) {
-          rs = filterLoadedCell(rs);
-          if (rs == null) {
-            continue;
-          }
           cache.add(rs);
           for (Cell cell : rs.rawCells()) {
             remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
           }
           countdown--;
           this.lastResult = rs;
-          if (this.lastResult.mayHaveMoreCellsInRow()) {
-            updateLastCellLoadedToCache(this.lastResult);
-          } else {
-            this.lastCellLoadedToCache = null;
-          }
         }
-        if (scan.getLimit() > 0 && !resultsToAddToCache.isEmpty()) {
-          int newLimit = scan.getLimit() - numberOfIndividualRows(resultsToAddToCache);
+        if (scan.getLimit() > 0) {
+          int newLimit =
+              scan.getLimit() - numberOfIndividualRows(Arrays.asList(resultsToAddToCache));
           assert newLimit >= 0;
           scan.setLimit(newLimit);
         }
@@ -554,13 +520,6 @@ public abstract class ClientScanner extends AbstractClientScanner {
       }
       // we are done with the current region
       if (regionExhausted) {
-        if (!partialResults.isEmpty()) {
-          // XXX: continue if there are partial results. But in fact server should not set
-          // hasMoreResults to false if there are partial results.
-          LOG.warn("Server tells us there is no more results for this region but we still have" +
-              " partialResults, this should not happen, retry on the current scanner anyway");
-          continue;
-        }
         if (!moveToNextRegion()) {
           break;
         }
@@ -568,141 +527,6 @@ public abstract class ClientScanner extends AbstractClientScanner {
     }
   }
 
-  /**
-   * This method ensures all of our book keeping regarding partial results is kept up to date. This
-   * method should be called once we know that the results we received back from the RPC request do
-   * not contain errors. We return a list of results that should be added to the cache. In general,
-   * this list will contain all NON-partial results from the input array (unless the client has
-   * specified that they are okay with receiving partial results)
-   * @param resultsFromServer The array of {@link Result}s returned from the server
-   * @param heartbeatMessage Flag indicating whether or not the response received from the server
-   *          represented a complete response, or a heartbeat message that was sent to keep the
-   *          client-server connection alive
-   * @return the list of results that should be added to the cache.
-   * @throws IOException
-   */
-  protected List<Result> getResultsToAddToCache(Result[] resultsFromServer,
-      boolean heartbeatMessage) throws IOException {
-    int resultSize = resultsFromServer != null ? resultsFromServer.length : 0;
-    List<Result> resultsToAddToCache = new ArrayList<Result>(resultSize);
-
-    // If the caller has indicated in their scan that they are okay with seeing partial results,
-    // then simply add all results to the list. Note allowPartial and setBatch are not same, we can
-    // return here if allow partials and we will handle batching later.
-    if (scan.getAllowPartialResults()) {
-      addResultsToList(resultsToAddToCache, resultsFromServer, 0,
-        (null == resultsFromServer ? 0 : resultsFromServer.length));
-      return resultsToAddToCache;
-    }
-
-    // If no results were returned it indicates that either we have the all the partial results
-    // necessary to construct the complete result or the server had to send a heartbeat message
-    // to the client to keep the client-server connection alive
-    if (resultsFromServer == null || resultsFromServer.length == 0) {
-      // If this response was an empty heartbeat message, then we have not exhausted the region
-      // and thus there may be more partials server side that still need to be added to the partial
-      // list before we form the complete Result
-      if (!partialResults.isEmpty() && !heartbeatMessage) {
-        resultsToAddToCache.add(Result.createCompleteResult(partialResults));
-        clearPartialResults();
-      }
-
-      return resultsToAddToCache;
-    }
-
-    for(Result result : resultsFromServer) {
-      if (partialResultsRow != null && Bytes.compareTo(result.getRow(), partialResultsRow) != 0) {
-        // We have a new row, complete the previous row.
-        resultsToAddToCache.add(Result.createCompleteResult(partialResults));
-        clearPartialResults();
-      }
-      Result res = regroupResults(result);
-      if (res != null) {
-        resultsToAddToCache.add(res);
-      }
-      if (!result.mayHaveMoreCellsInRow()) {
-        // We are done for this row
-        if (partialResultsCellSizes > 0) {
-          resultsToAddToCache.add(Result.createCompleteResult(partialResults));
-        }
-        clearPartialResults();
-      }
-    }
-
-    return resultsToAddToCache;
-  }
-
-  /**
-   * Add new result to the partial list and return a batched Result if caching size exceed
-   * batching limit.
-   * If user setBatch(5) and rpc returns 3+5+5+5+3 cells, we should return 5+5+5+5+1 to user.
-   * setBatch doesn't mean setAllowPartialResult(true)
-   * @param result The result that we want to add to our list of partial Results
-   * @return the result if we have batch limit and there is one Result can be returned to user, or
-   *         null if we have not.
-   * @throws IOException
-   */
-  private Result regroupResults(final Result result) throws IOException {
-    partialResultsRow = result.getRow();
-    partialResults.add(result);
-    partialResultsCellSizes += result.size();
-    if (scan.getBatch() > 0 && partialResultsCellSizes >= scan.getBatch()) {
-      Cell[] cells = new Cell[scan.getBatch()];
-      int count = 0;
-      boolean stale = false;
-      while (count < scan.getBatch()) {
-        Result res = partialResults.poll();
-        stale = stale || res.isStale();
-        if (res.size() + count <= scan.getBatch()) {
-          System.arraycopy(res.rawCells(), 0, cells, count, res.size());
-          count += res.size();
-        } else {
-          int len = scan.getBatch() - count;
-          System.arraycopy(res.rawCells(), 0, cells, count, len);
-          Cell[] remainingCells = new Cell[res.size() - len];
-          System.arraycopy(res.rawCells(), len, remainingCells, 0, res.size() - len);
-          Result remainingRes = Result.create(remainingCells, res.getExists(), res.isStale(),
-              res.mayHaveMoreCellsInRow());
-          partialResults.addFirst(remainingRes);
-          count = scan.getBatch();
-        }
-      }
-      partialResultsCellSizes -= scan.getBatch();
-      if (partialResultsCellSizes == 0) {
-        // We have nothing in partialResults, clear the flags to prevent returning empty Result
-        // when next result belongs to the next row.
-        clearPartialResults();
-      }
-      return Result.create(cells, null, stale,
-          partialResultsCellSizes > 0 || result.mayHaveMoreCellsInRow());
-    }
-    return null;
-  }
-
-  /**
-   * Convenience method for clearing the list of partials and resetting the partialResultsRow.
-   */
-  private void clearPartialResults() {
-    partialResults.clear();
-    partialResultsCellSizes = 0;
-    partialResultsRow = null;
-  }
-
-  /**
-   * Helper method for adding results between the indices [start, end) to the outputList
-   * @param outputList the list that results will be added to
-   * @param inputArray the array that results are taken from
-   * @param start beginning index (inclusive)
-   * @param end ending index (exclusive)
-   */
-  private void addResultsToList(List<Result> outputList, Result[] inputArray, int start, int end) {
-    if (inputArray == null || start < 0 || end > inputArray.length) return;
-
-    for (int i = start; i < end; i++) {
-      outputList.add(inputArray[i]);
-    }
-  }
-
   @Override
   public void close() {
     if (!scanMetricsPublished) writeScanMetrics();
@@ -739,58 +563,4 @@ public abstract class ClientScanner extends AbstractClientScanner {
     }
     return false;
   }
-
-  protected void updateLastCellLoadedToCache(Result result) {
-    if (result.rawCells().length == 0) {
-      return;
-    }
-    this.lastCellLoadedToCache = result.rawCells()[result.rawCells().length - 1];
-  }
-
-  /**
-   * Compare two Cells considering reversed scanner. ReversedScanner only reverses rows, not
-   * columns.
-   */
-  private int compare(Cell a, Cell b) {
-    int r = 0;
-    if (currentRegion != null && currentRegion.isMetaRegion()) {
-      r = metaComparator.compareRows(a, b);
-    } else {
-      r = CellComparator.compareRows(a, b);
-    }
-    if (r != 0) {
-      return this.scan.isReversed() ? -r : r;
-    }
-    return CellComparator.compareWithoutRow(a, b);
-  }
-
-  private Result filterLoadedCell(Result result) {
-    // we only filter result when last result is partial
-    // so lastCellLoadedToCache and result should have same row key.
-    // However, if 1) read some cells; 1.1) delete this row at the same time 2) move region;
-    // 3) read more cell. lastCellLoadedToCache and result will be not at same row.
-    if (lastCellLoadedToCache == null || result.rawCells().length == 0) {
-      return result;
-    }
-    if (compare(this.lastCellLoadedToCache, result.rawCells()[0]) < 0) {
-      // The first cell of this result is larger than the last cell of loadcache.
-      // If user do not allow partial result, it must be true.
-      return result;
-    }
-    if (compare(this.lastCellLoadedToCache, result.rawCells()[result.rawCells().length - 1]) >= 0) {
-      // The last cell of this result is smaller than the last cell of loadcache, skip all.
-      return null;
-    }
-
-    // The first one must not in filtered result, we start at the second.
-    int index = 1;
-    while (index < result.rawCells().length) {
-      if (compare(this.lastCellLoadedToCache, result.rawCells()[index]) < 0) {
-        break;
-      }
-      index++;
-    }
-    Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length);
-    return Result.create(list, result.getExists(), result.isStale(), result.mayHaveMoreCellsInRow());
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6be8d204/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java
new file mode 100644
index 0000000..e09ddfb
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * A scan result cache that only returns complete result.
+ */
+@InterfaceAudience.Private
+class CompleteScanResultCache implements ScanResultCache {
+
+  private final List<Result> partialResults = new ArrayList<>();
+
+  private Result combine() throws IOException {
+    Result result = Result.createCompleteResult(partialResults);
+    partialResults.clear();
+    return result;
+  }
+
+  private Result[] prependCombined(Result[] results, int length) throws IOException {
+    if (length == 0) {
+      return new Result[] { combine() };
+    }
+    // the last part of a partial result may not be marked as partial so here we need to check if
+    // there is a row change.
+    int start;
+    if (Bytes.equals(partialResults.get(0).getRow(), results[0].getRow())) {
+      partialResults.add(results[0]);
+      start = 1;
+      length--;
+    } else {
+      start = 0;
+    }
+    Result[] prependResults = new Result[length + 1];
+    prependResults[0] = combine();
+    System.arraycopy(results, start, prependResults, 1, length);
+    return prependResults;
+  }
+
+  @Override
+  public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException {
+    // If no results were returned it indicates that either we have the all the partial results
+    // necessary to construct the complete result or the server had to send a heartbeat message
+    // to the client to keep the client-server connection alive
+    if (results.length == 0) {
+      // If this response was an empty heartbeat message, then we have not exhausted the region
+      // and thus there may be more partials server side that still need to be added to the partial
+      // list before we form the complete Result
+      if (!partialResults.isEmpty() && !isHeartbeatMessage) {
+        return new Result[] { combine() };
+      }
+      return EMPTY_RESULT_ARRAY;
+    }
+    // In every RPC response there should be at most a single partial result. Furthermore, if
+    // there is a partial result, it is guaranteed to be in the last position of the array.
+    Result last = results[results.length - 1];
+    if (last.mayHaveMoreCellsInRow()) {
+      if (partialResults.isEmpty()) {
+        partialResults.add(last);
+        return Arrays.copyOf(results, results.length - 1);
+      }
+      // We have only one result and it is partial
+      if (results.length == 1) {
+        // check if there is a row change
+        if (Bytes.equals(partialResults.get(0).getRow(), last.getRow())) {
+          partialResults.add(last);
+          return EMPTY_RESULT_ARRAY;
+        }
+        Result completeResult = combine();
+        partialResults.add(last);
+        return new Result[] { completeResult };
+      }
+      // We have some complete results
+      Result[] resultsToReturn = prependCombined(results, results.length - 1);
+      partialResults.add(last);
+      return resultsToReturn;
+    }
+    if (!partialResults.isEmpty()) {
+      return prependCombined(results, results.length);
+    }
+    return results;
+  }
+
+  @Override
+  public void clear() {
+    partialResults.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/6be8d204/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 97f71e7..1bdc5fe 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -24,12 +24,16 @@ import com.google.common.annotations.VisibleForTesting;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ServerName;
@@ -239,6 +243,40 @@ public class ConnectionUtils {
     return Bytes.equals(row, EMPTY_END_ROW);
   }
 
+  private static final Comparator<Cell> COMPARE_WITHOUT_ROW = new Comparator<Cell>() {
+
+    @Override
+    public int compare(Cell o1, Cell o2) {
+      return CellComparator.compareWithoutRow(o1, o2);
+    }
+  };
+
+  static Result filterCells(Result result, Cell keepCellsAfter) {
+    if (keepCellsAfter == null) {
+      // do not need to filter
+      return result;
+    }
+    // not the same row
+    if (!CellUtil.matchingRow(keepCellsAfter, result.getRow(), 0, result.getRow().length)) {
+      return result;
+    }
+    Cell[] rawCells = result.rawCells();
+    int index = Arrays.binarySearch(rawCells, keepCellsAfter, COMPARE_WITHOUT_ROW);
+    if (index < 0) {
+      index = -index - 1;
+    } else {
+      index++;
+    }
+    if (index == 0) {
+      return result;
+    }
+    if (index == rawCells.length) {
+      return null;
+    }
+    return Result.create(Arrays.copyOfRange(rawCells, index, rawCells.length), null,
+      result.isStale(), result.mayHaveMoreCellsInRow());
+  }
+
   static boolean noMoreResultsForScan(Scan scan, HRegionInfo info) {
     if (isEmptyStopRow(info.getEndKey())) {
       return true;
@@ -287,4 +325,14 @@ public class ConnectionUtils {
     }
     return count;
   }
+
+  public static ScanResultCache createScanResultCache(Scan scan) {
+    if (scan.getAllowPartialResults()) {
+      return new AllowPartialScanResultCache();
+    } else if (scan.getBatch() > 0) {
+      return new BatchScanResultCache(scan.getBatch());
+    } else {
+      return new CompleteScanResultCache();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6be8d204/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
index 3483d26..4c67c50 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
@@ -24,7 +24,9 @@ import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -34,7 +36,6 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -857,44 +858,42 @@ public class Result implements CellScannable, CellScanner {
    * @throws IOException A complete result cannot be formed because the results in the partial list
    *           come from different rows
    */
-  public static Result createCompleteResult(List<Result> partialResults)
+  public static Result createCompleteResult(Iterable<Result> partialResults)
       throws IOException {
-    List<Cell> cells = new ArrayList<Cell>();
+    if (partialResults == null) {
+      return Result.create(Collections.<Cell> emptyList(), null, false);
+    }
+    List<Cell> cells = new ArrayList<>();
     boolean stale = false;
     byte[] prevRow = null;
     byte[] currentRow = null;
-
-    if (partialResults != null && !partialResults.isEmpty()) {
-      for (int i = 0; i < partialResults.size(); i++) {
-        Result r = partialResults.get(i);
-        currentRow = r.getRow();
-        if (prevRow != null && !Bytes.equals(prevRow, currentRow)) {
-          throw new IOException(
-              "Cannot form complete result. Rows of partial results do not match." +
-                  " Partial Results: " + partialResults);
-        }
-
-        // Ensure that all Results except the last one are marked as partials. The last result
-        // may not be marked as a partial because Results are only marked as partials when
-        // the scan on the server side must be stopped due to reaching the maxResultSize.
-        // Visualizing it makes it easier to understand:
-        // maxResultSize: 2 cells
-        // (-x-) represents cell number x in a row
-        // Example: row1: -1- -2- -3- -4- -5- (5 cells total)
-        // How row1 will be returned by the server as partial Results:
-        // Result1: -1- -2- (2 cells, size limit reached, mark as partial)
-        // Result2: -3- -4- (2 cells, size limit reached, mark as partial)
-        // Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial)
-        if (i != (partialResults.size() - 1) && !r.mayHaveMoreCellsInRow()) {
-          throw new IOException(
-              "Cannot form complete result. Result is missing partial flag. " +
-                  "Partial Results: " + partialResults);
-        }
-        prevRow = currentRow;
-        stale = stale || r.isStale();
-        for (Cell c : r.rawCells()) {
-          cells.add(c);
-        }
+    for (Iterator<Result> iter = partialResults.iterator(); iter.hasNext();) {
+      Result r = iter.next();
+      currentRow = r.getRow();
+      if (prevRow != null && !Bytes.equals(prevRow, currentRow)) {
+        throw new IOException(
+            "Cannot form complete result. Rows of partial results do not match." +
+                " Partial Results: " + partialResults);
+      }
+      // Ensure that all Results except the last one are marked as partials. The last result
+      // may not be marked as a partial because Results are only marked as partials when
+      // the scan on the server side must be stopped due to reaching the maxResultSize.
+      // Visualizing it makes it easier to understand:
+      // maxResultSize: 2 cells
+      // (-x-) represents cell number x in a row
+      // Example: row1: -1- -2- -3- -4- -5- (5 cells total)
+      // How row1 will be returned by the server as partial Results:
+      // Result1: -1- -2- (2 cells, size limit reached, mark as partial)
+      // Result2: -3- -4- (2 cells, size limit reached, mark as partial)
+      // Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial)
+      if (iter.hasNext() && !r.mayHaveMoreCellsInRow()) {
+        throw new IOException("Cannot form complete result. Result is missing partial flag. " +
+            "Partial Results: " + partialResults);
+      }
+      prevRow = currentRow;
+      stale = stale || r.isStale();
+      for (Cell c : r.rawCells()) {
+        cells.add(c);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/6be8d204/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java
new file mode 100644
index 0000000..2366b57
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Used to separate the row constructing logic.
+ * <p>
+ * After we add heartbeat support for scan, RS may return partial result even if allowPartial is
+ * false and batch is 0. With this interface, the implementation now looks like:
+ * <ol>
+ * <li>Get results from ScanResponse proto.</li>
+ * <li>Pass them to ScanResultCache and get something back.</li>
+ * <li>If we actually get something back, then pass it to ScanObserver.</li>
+ * </ol>
+ */
+@InterfaceAudience.Private
+interface ScanResultCache {
+
+  static final Result[] EMPTY_RESULT_ARRAY = new Result[0];
+
+  /**
+   * Add the given results to cache and get valid results back.
+   * @param results the results of a scan next. Must not be null.
+   * @param isHeartbeatMessage indicate whether the results is gotten from a heartbeat response.
+   * @return valid results, never null.
+   */
+  Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException;
+
+  /**
+   * Clear the cached result if any. Called when scan error and we will start from a start of a row
+   * again.
+   */
+  void clear();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/6be8d204/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java
new file mode 100644
index 0000000..3fe43a5
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.TestBatchScanResultCache.createCells;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ SmallTests.class, ClientTests.class })
+public class TestAllowPartialScanResultCache {
+
+  private static byte[] CF = Bytes.toBytes("cf");
+
+  private AllowPartialScanResultCache resultCache;
+
+  @Before
+  public void setUp() {
+    resultCache = new AllowPartialScanResultCache();
+  }
+
+  @After
+  public void tearDown() {
+    resultCache.clear();
+    resultCache = null;
+  }
+
+  @Test
+  public void test() throws IOException {
+    assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
+      resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false));
+    assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
+      resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true));
+
+    Cell[] cells1 = createCells(CF, 1, 10);
+    Cell[] cells2 = createCells(CF, 2, 10);
+
+    Result[] results1 = resultCache.addAndGet(
+      new Result[] { Result.create(Arrays.copyOf(cells1, 5), null, false, true) }, false);
+    assertEquals(1, results1.length);
+    assertEquals(1, Bytes.toInt(results1[0].getRow()));
+    assertEquals(5, results1[0].rawCells().length);
+    for (int i = 0; i < 5; i++) {
+      assertEquals(1, Bytes.toInt(results1[0].getValue(CF, Bytes.toBytes("cq" + i))));
+    }
+
+    Result[] results2 = resultCache.addAndGet(
+      new Result[] { Result.create(Arrays.copyOfRange(cells1, 1, 10), null, false, true) }, false);
+    assertEquals(1, results2.length);
+    assertEquals(1, Bytes.toInt(results2[0].getRow()));
+    assertEquals(5, results2[0].rawCells().length);
+    for (int i = 5; i < 10; i++) {
+      assertEquals(1, Bytes.toInt(results2[0].getValue(CF, Bytes.toBytes("cq" + i))));
+    }
+
+    Result[] results3 =
+        resultCache.addAndGet(new Result[] { Result.create(cells1), Result.create(cells2) }, false);
+    assertEquals(1, results3.length);
+    assertEquals(2, Bytes.toInt(results3[0].getRow()));
+    assertEquals(10, results3[0].rawCells().length);
+    for (int i = 0; i < 10; i++) {
+      assertEquals(2, Bytes.toInt(results3[0].getValue(CF, Bytes.toBytes("cq" + i))));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/6be8d204/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java
new file mode 100644
index 0000000..31a4594
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ SmallTests.class, ClientTests.class })
+public class TestBatchScanResultCache {
+
+  private static byte[] CF = Bytes.toBytes("cf");
+
+  private BatchScanResultCache resultCache;
+
+  @Before
+  public void setUp() {
+    resultCache = new BatchScanResultCache(4);
+  }
+
+  @After
+  public void tearDown() {
+    resultCache.clear();
+    resultCache = null;
+  }
+
+  static Cell createCell(byte[] cf, int key, int cq) {
+    return new KeyValue(Bytes.toBytes(key), cf, Bytes.toBytes("cq" + cq), Bytes.toBytes(key));
+  }
+
+  static Cell[] createCells(byte[] cf, int key, int numCqs) {
+    Cell[] cells = new Cell[numCqs];
+    for (int i = 0; i < numCqs; i++) {
+      cells[i] = createCell(cf, key, i);
+    }
+    return cells;
+  }
+
+  private void assertResultEquals(Result result, int key, int start, int to) {
+    assertEquals(to - start, result.size());
+    for (int i = start; i < to; i++) {
+      assertEquals(key, Bytes.toInt(result.getValue(CF, Bytes.toBytes("cq" + i))));
+    }
+    assertEquals(to - start == 4, result.mayHaveMoreCellsInRow());
+  }
+
+  @Test
+  public void test() throws IOException {
+    assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
+      resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false));
+    assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
+      resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true));
+
+    Cell[] cells1 = createCells(CF, 1, 10);
+    Cell[] cells2 = createCells(CF, 2, 10);
+    Cell[] cells3 = createCells(CF, 3, 10);
+    assertEquals(0, resultCache.addAndGet(
+      new Result[] { Result.create(Arrays.copyOf(cells1, 3), null, false, true) }, false).length);
+    Result[] results = resultCache.addAndGet(
+      new Result[] { Result.create(Arrays.copyOfRange(cells1, 3, 7), null, false, true),
+          Result.create(Arrays.copyOfRange(cells1, 7, 10), null, false, true) },
+      false);
+    assertEquals(2, results.length);
+    assertResultEquals(results[0], 1, 0, 4);
+    assertResultEquals(results[1], 1, 4, 8);
+    results = resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false);
+    assertEquals(1, results.length);
+    assertResultEquals(results[0], 1, 8, 10);
+
+    results = resultCache.addAndGet(
+      new Result[] { Result.create(Arrays.copyOfRange(cells2, 0, 4), null, false, true),
+          Result.create(Arrays.copyOfRange(cells2, 4, 8), null, false, true),
+          Result.create(Arrays.copyOfRange(cells2, 8, 10), null, false, true),
+          Result.create(Arrays.copyOfRange(cells3, 0, 4), null, false, true),
+          Result.create(Arrays.copyOfRange(cells3, 4, 8), null, false, true),
+          Result.create(Arrays.copyOfRange(cells3, 8, 10), null, false, false) },
+      false);
+    assertEquals(6, results.length);
+    assertResultEquals(results[0], 2, 0, 4);
+    assertResultEquals(results[1], 2, 4, 8);
+    assertResultEquals(results[2], 2, 8, 10);
+    assertResultEquals(results[3], 3, 0, 4);
+    assertResultEquals(results[4], 3, 4, 8);
+    assertResultEquals(results[5], 3, 8, 10);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/6be8d204/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java
new file mode 100644
index 0000000..8759593
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ SmallTests.class, ClientTests.class })
+public class TestCompleteResultScanResultCache {
+
+  private static byte[] CF = Bytes.toBytes("cf");
+
+  private static byte[] CQ1 = Bytes.toBytes("cq1");
+
+  private static byte[] CQ2 = Bytes.toBytes("cq2");
+
+  private static byte[] CQ3 = Bytes.toBytes("cq3");
+
+  private CompleteScanResultCache resultCache;
+
+  @Before
+  public void setUp() {
+    resultCache = new CompleteScanResultCache();
+  }
+
+  @After
+  public void tearDown() {
+    resultCache.clear();
+    resultCache = null;
+  }
+
+  private static Cell createCell(int key, byte[] cq) {
+    return new KeyValue(Bytes.toBytes(key), CF, cq, Bytes.toBytes(key));
+  }
+
+  @Test
+  public void testNoPartial() throws IOException {
+    assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
+      resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false));
+    assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
+      resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true));
+    int count = 10;
+    Result[] results = new Result[count];
+    for (int i = 0; i < count; i++) {
+      results[i] = Result.create(Arrays.asList(createCell(i, CQ1)));
+    }
+    assertSame(results, resultCache.addAndGet(results, false));
+  }
+
+  @Test
+  public void testCombine1() throws IOException {
+    Result previousResult = Result.create(Arrays.asList(createCell(0, CQ1)), null, false, true);
+    Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true);
+    Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, true);
+    Result result3 = Result.create(Arrays.asList(createCell(1, CQ3)), null, false, true);
+    Result[] results = resultCache.addAndGet(new Result[] { previousResult, result1 }, false);
+    assertEquals(1, results.length);
+    assertSame(previousResult, results[0]);
+
+    assertEquals(0, resultCache.addAndGet(new Result[] { result2 }, false).length);
+    assertEquals(0, resultCache.addAndGet(new Result[] { result3 }, false).length);
+    assertEquals(0, resultCache.addAndGet(new Result[0], true).length);
+
+    results = resultCache.addAndGet(new Result[0], false);
+    assertEquals(1, results.length);
+    assertEquals(1, Bytes.toInt(results[0].getRow()));
+    assertEquals(3, results[0].rawCells().length);
+    assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ1)));
+    assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ2)));
+    assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ3)));
+  }
+
+  @Test
+  public void testCombine2() throws IOException {
+    Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true);
+    Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, true);
+    Result result3 = Result.create(Arrays.asList(createCell(1, CQ3)), null, false, true);
+    Result nextResult1 = Result.create(Arrays.asList(createCell(2, CQ1)), null, false, true);
+    Result nextToNextResult1 = Result.create(Arrays.asList(createCell(3, CQ2)), null, false, false);
+
+    assertEquals(0, resultCache.addAndGet(new Result[] { result1 }, false).length);
+    assertEquals(0, resultCache.addAndGet(new Result[] { result2 }, false).length);
+    assertEquals(0, resultCache.addAndGet(new Result[] { result3 }, false).length);
+
+    Result[] results = resultCache.addAndGet(new Result[] { nextResult1 }, false);
+    assertEquals(1, results.length);
+    assertEquals(1, Bytes.toInt(results[0].getRow()));
+    assertEquals(3, results[0].rawCells().length);
+    assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ1)));
+    assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ2)));
+    assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ3)));
+
+    results = resultCache.addAndGet(new Result[] { nextToNextResult1 }, false);
+    assertEquals(2, results.length);
+    assertEquals(2, Bytes.toInt(results[0].getRow()));
+    assertEquals(1, results[0].rawCells().length);
+    assertEquals(2, Bytes.toInt(results[0].getValue(CF, CQ1)));
+    assertEquals(3, Bytes.toInt(results[1].getRow()));
+    assertEquals(1, results[1].rawCells().length);
+    assertEquals(3, Bytes.toInt(results[1].getValue(CF, CQ2)));
+  }
+
+  @Test
+  public void testCombine3() throws IOException {
+    Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true);
+    Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, true);
+    Result nextResult1 = Result.create(Arrays.asList(createCell(2, CQ1)), null, false, false);
+    Result nextToNextResult1 = Result.create(Arrays.asList(createCell(3, CQ1)), null, false, true);
+
+    assertEquals(0, resultCache.addAndGet(new Result[] { result1 }, false).length);
+    assertEquals(0, resultCache.addAndGet(new Result[] { result2 }, false).length);
+
+    Result[] results =
+        resultCache.addAndGet(new Result[] { nextResult1, nextToNextResult1 }, false);
+    assertEquals(2, results.length);
+    assertEquals(1, Bytes.toInt(results[0].getRow()));
+    assertEquals(2, results[0].rawCells().length);
+    assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ1)));
+    assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ2)));
+    assertEquals(2, Bytes.toInt(results[1].getRow()));
+    assertEquals(1, results[1].rawCells().length);
+    assertEquals(2, Bytes.toInt(results[1].getValue(CF, CQ1)));
+
+    results = resultCache.addAndGet(new Result[0], false);
+    assertEquals(1, results.length);
+    assertEquals(3, Bytes.toInt(results[0].getRow()));
+    assertEquals(1, results[0].rawCells().length);
+    assertEquals(3, Bytes.toInt(results[0].getValue(CF, CQ1)));
+  }
+
+  @Test
+  public void testCombine4() throws IOException {
+    Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true);
+    Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, false);
+    Result nextResult1 = Result.create(Arrays.asList(createCell(2, CQ1)), null, false, true);
+    Result nextResult2 = Result.create(Arrays.asList(createCell(2, CQ2)), null, false, false);
+
+    assertEquals(0, resultCache.addAndGet(new Result[] { result1 }, false).length);
+
+    Result[] results = resultCache.addAndGet(new Result[] { result2, nextResult1 }, false);
+    assertEquals(1, results.length);
+    assertEquals(1, Bytes.toInt(results[0].getRow()));
+    assertEquals(2, results[0].rawCells().length);
+    assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ1)));
+    assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ2)));
+
+    results = resultCache.addAndGet(new Result[] { nextResult2 }, false);
+    assertEquals(1, results.length);
+    assertEquals(2, Bytes.toInt(results[0].getRow()));
+    assertEquals(2, results[0].rawCells().length);
+    assertEquals(2, Bytes.toInt(results[0].getValue(CF, CQ1)));
+    assertEquals(2, Bytes.toInt(results[0].getValue(CF, CQ2)));
+  }
+}