You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2015/04/09 22:59:27 UTC

[22/50] [abbrv] hbase git commit: HBASE-13421 Reduce the number of object creations introduced by HBASE-11544 in scan RPC hot code paths

HBASE-13421 Reduce the number of object creations introduced by HBASE-11544 in scan RPC hot code paths

Signed-off-by: stack <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/62d47e17
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/62d47e17
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/62d47e17

Branch: refs/heads/hbase-12439
Commit: 62d47e175c7c36dc2bd6b225d03978cd6303fc59
Parents: cbc53a0
Author: Jonathan Lawlor <jo...@cloudera.com>
Authored: Tue Mar 24 15:52:46 2015 -0700
Committer: stack <st...@apache.org>
Committed: Wed Apr 8 14:02:49 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/client/ClientScanner.java      |   3 +
 .../client/ScannerCallableWithReplicas.java     |  34 +-
 .../coprocessor/example/BulkDeleteEndpoint.java |   3 +-
 .../coprocessor/example/RowCountEndpoint.java   |   5 +-
 .../hbase/client/ClientSideRegionScanner.java   |   8 +-
 .../coprocessor/AggregateImplementation.java    |  15 +-
 .../hadoop/hbase/regionserver/HRegion.java      | 271 +++++-----
 .../hbase/regionserver/InternalScanner.java     | 209 +-------
 .../hadoop/hbase/regionserver/KeyValueHeap.java |  44 +-
 .../regionserver/NoLimitScannerContext.java     | 102 ++++
 .../hbase/regionserver/RSRpcServices.java       |  66 +--
 .../hbase/regionserver/RegionScanner.java       |  50 +-
 .../hbase/regionserver/ScannerContext.java      | 527 +++++++++++++++++++
 .../hadoop/hbase/regionserver/StoreFlusher.java |   7 +-
 .../hadoop/hbase/regionserver/StoreScanner.java |  78 ++-
 .../regionserver/compactions/Compactor.java     |   7 +-
 .../security/access/AccessControlLists.java     |   3 +-
 .../hbase/security/access/AccessController.java |   6 +-
 .../org/apache/hadoop/hbase/HBaseTestCase.java  |   3 +-
 .../hbase/TestPartialResultsFromClientSide.java |   8 +-
 .../hbase/client/TestIntraRowPagination.java    |   3 +-
 .../hadoop/hbase/client/TestReplicasClient.java |  90 +++-
 .../coprocessor/ColumnAggregationEndpoint.java  |   3 +-
 .../ColumnAggregationEndpointNullResponse.java  |   3 +-
 .../ColumnAggregationEndpointWithErrors.java    |   3 +-
 .../coprocessor/TestCoprocessorInterface.java   |  23 +-
 .../TestRegionObserverInterface.java            |  19 +-
 .../hbase/filter/TestColumnPrefixFilter.java    |   7 +-
 .../hbase/filter/TestDependentColumnFilter.java |   3 +-
 .../apache/hadoop/hbase/filter/TestFilter.java  |  29 +-
 .../filter/TestInvocationRecordFilter.java      |   5 +-
 .../filter/TestMultipleColumnPrefixFilter.java  |   9 +-
 .../hbase/io/encoding/TestPrefixTree.java       |  11 +-
 .../TestScannerSelectionUsingKeyRange.java      |   5 +-
 .../io/hfile/TestScannerSelectionUsingTTL.java  |   3 +-
 .../hbase/regionserver/TestAtomicOperation.java |   9 +-
 .../hbase/regionserver/TestBlocksScanned.java   |   8 +-
 .../hbase/regionserver/TestColumnSeeking.java   |   5 +-
 .../hbase/regionserver/TestDefaultMemStore.java |   9 +-
 .../regionserver/TestGetClosestAtOrBefore.java  |   5 +-
 .../hadoop/hbase/regionserver/TestHRegion.java  | 104 ++--
 .../hbase/regionserver/TestKeepDeletes.java     |   6 +-
 .../hbase/regionserver/TestMajorCompaction.java |   9 +-
 .../regionserver/TestMultiColumnScanner.java    |   3 +-
 .../TestRegionMergeTransaction.java             |   3 +-
 .../regionserver/TestReversibleScanners.java    |   3 +-
 .../regionserver/TestScanWithBloomError.java    |   3 +-
 .../hadoop/hbase/regionserver/TestScanner.java  |  11 +-
 .../regionserver/TestSeekOptimizations.java     |   3 +-
 .../regionserver/TestSplitTransaction.java      |   3 +-
 .../hbase/regionserver/TestStoreScanner.java    |  53 +-
 .../hbase/regionserver/TestStripeCompactor.java |  16 +-
 .../hbase/regionserver/TestWideScanner.java     |   3 +-
 .../compactions/TestStripeCompactionPolicy.java |  18 +-
 .../hbase/regionserver/wal/TestWALReplay.java   |   3 +-
 .../apache/hadoop/hbase/util/TestMergeTool.java |   3 +-
 56 files changed, 1167 insertions(+), 778 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/62d47e17/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 05a780c..ccd8c2d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -401,6 +401,9 @@ public class ClientScanner extends AbstractClientScanner {
         // happens for the cases where we see exceptions. Since only openScanner
         // would have happened, values would be null
         if (values == null && callable.switchedToADifferentReplica()) {
+          // Any accumulated partial results are no longer valid since the callable will
+          // openScanner with the correct startkey and we must pick up from there
+          clearPartialResults();
           this.currentRegion = callable.getHRegionInfo();
           continue;
         }

http://git-wip-us.apache.org/repos/asf/hbase/blob/62d47e17/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index ca6ab05..7ba152b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -292,14 +292,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
         continue; //this was already scheduled earlier
       }
       ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id);
-
-      if (this.lastResult != null) {
-        if(s.getScan().isReversed()){
-          s.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow()));
-        }else {
-          s.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1]));
-        }
-      }
+      setStartRowForReplicaCallable(s);
       outstandingCallables.add(s);
       RetryingRPC retryingOnReplica = new RetryingRPC(s);
       cs.submit(retryingOnReplica, scannerTimeout, id);
@@ -307,6 +300,31 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
     return max - min + 1;
   }
 
+  /**
+   * Set the start row for the replica callable based on the state of the last result received.
+   * @param callable The callable to set the start row on
+   */
+  private void setStartRowForReplicaCallable(ScannerCallable callable) {
+    if (this.lastResult == null || callable == null) return;
+
+    if (this.lastResult.isPartial()) {
+      // The last result was a partial result which means we have not received all of the cells
+      // for this row. Thus, use the last result's row as the start row. If a replica switch
+      // occurs, the scanner will ensure that any accumulated partial results are cleared,
+      // and the scan can resume from this row.
+      callable.getScan().setStartRow(this.lastResult.getRow());
+    } else {
+      // The last result was not a partial result which means it contained all of the cells for
+      // that row (we no longer need any information from it). Set the start row to the next
+      // closest row that could be seen.
+      if (callable.getScan().isReversed()) {
+        callable.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow()));
+      } else {
+        callable.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1]));
+      }
+    }
+  }
+
   @VisibleForTesting
   boolean isAnyRPCcancelled() {
     return someRPCcancelled;

http://git-wip-us.apache.org/repos/asf/hbase/blob/62d47e17/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java
index e0c3bae..93f98ac 100644
--- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.Bu
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.regionserver.OperationStatus;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -137,7 +136,7 @@ public class BulkDeleteEndpoint extends BulkDeleteService implements Coprocessor
         List<List<Cell>> deleteRows = new ArrayList<List<Cell>>(rowBatchSize);
         for (int i = 0; i < rowBatchSize; i++) {
           List<Cell> results = new ArrayList<Cell>();
-          hasMore = NextState.hasMoreValues(scanner.next(results));
+          hasMore = scanner.next(results);
           if (results.size() > 0) {
             deleteRows.add(results);
           }

http://git-wip-us.apache.org/repos/asf/hbase/blob/62d47e17/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java
index 2afd05e..4309cdc 100644
--- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.util.Bytes;
 
 import com.google.protobuf.RpcCallback;
@@ -81,7 +80,7 @@ public class RowCountEndpoint extends ExampleProtos.RowCountService
       byte[] lastRow = null;
       long count = 0;
       do {
-        hasMore = NextState.hasMoreValues(scanner.next(results));
+        hasMore = scanner.next(results);
         for (Cell kv : results) {
           byte[] currentRow = CellUtil.cloneRow(kv);
           if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {
@@ -120,7 +119,7 @@ public class RowCountEndpoint extends ExampleProtos.RowCountService
       boolean hasMore = false;
       long count = 0;
       do {
-        hasMore = NextState.hasMoreValues(scanner.next(results));
+        hasMore = scanner.next(results);
         for (Cell kv : results) {
           count++;
         }

http://git-wip-us.apache.org/repos/asf/hbase/blob/62d47e17/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
index a80a07e..5809983 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -30,8 +29,10 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.NoLimitScannerContext;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.mortbay.log.Log;
 
@@ -72,10 +73,7 @@ public class ClientSideRegionScanner extends AbstractClientScanner {
   public Result next() throws IOException {
     values.clear();
 
-    // negative values indicate no limits
-    final long remainingResultSize = -1;
-    final int batchLimit = -1;
-    scanner.nextRaw(values, batchLimit, remainingResultSize);
+    scanner.nextRaw(values, NoLimitScannerContext.getInstance());
     if (values.isEmpty()) {
       //we are done
       return null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/62d47e17/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
index b6f834e..81c933b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateReque
 import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
 import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
@@ -92,7 +91,7 @@ extends AggregateService implements CoprocessorService, Coprocessor {
       // qualifier can be null.
       boolean hasMoreRows = false;
       do {
-        hasMoreRows = NextState.hasMoreValues(scanner.next(results));
+        hasMoreRows = scanner.next(results);
         int listSize = results.size();
         for (int i = 0; i < listSize; i++) {
           temp = ci.getValue(colFamily, qualifier, results.get(i));
@@ -146,7 +145,7 @@ extends AggregateService implements CoprocessorService, Coprocessor {
       }
       boolean hasMoreRows = false;
       do {
-        hasMoreRows = NextState.hasMoreValues(scanner.next(results));
+        hasMoreRows = scanner.next(results);
         int listSize = results.size();
         for (int i = 0; i < listSize; i++) {
           temp = ci.getValue(colFamily, qualifier, results.get(i));
@@ -200,7 +199,7 @@ extends AggregateService implements CoprocessorService, Coprocessor {
       List<Cell> results = new ArrayList<Cell>();
       boolean hasMoreRows = false;
       do {
-        hasMoreRows = NextState.hasMoreValues(scanner.next(results));
+        hasMoreRows = scanner.next(results);
         int listSize = results.size();
         for (int i = 0; i < listSize; i++) {
           temp = ci.getValue(colFamily, qualifier, results.get(i));
@@ -254,7 +253,7 @@ extends AggregateService implements CoprocessorService, Coprocessor {
       scanner = env.getRegion().getScanner(scan);
       boolean hasMoreRows = false;
       do {
-        hasMoreRows = NextState.hasMoreValues(scanner.next(results));
+        hasMoreRows = scanner.next(results);
         if (results.size() > 0) {
           counter++;
         }
@@ -313,7 +312,7 @@ extends AggregateService implements CoprocessorService, Coprocessor {
     
       do {
         results.clear();
-        hasMoreRows = NextState.hasMoreValues(scanner.next(results));
+        hasMoreRows = scanner.next(results);
         int listSize = results.size();
         for (int i = 0; i < listSize; i++) {
           sumVal = ci.add(sumVal, ci.castToReturnType(ci.getValue(colFamily,
@@ -374,7 +373,7 @@ extends AggregateService implements CoprocessorService, Coprocessor {
     
       do {
         tempVal = null;
-        hasMoreRows = NextState.hasMoreValues(scanner.next(results));
+        hasMoreRows = scanner.next(results);
         int listSize = results.size();
         for (int i = 0; i < listSize; i++) {
           tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
@@ -441,7 +440,7 @@ extends AggregateService implements CoprocessorService, Coprocessor {
       do {
         tempVal = null;
         tempWeight = null;
-        hasMoreRows = NextState.hasMoreValues(scanner.next(results));
+        hasMoreRows = scanner.next(results);
         int listSize = results.size();
         for (int i = 0; i < listSize; i++) {
           Cell kv = results.get(i);

http://git-wip-us.apache.org/repos/asf/hbase/blob/62d47e17/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 4a8e7cc..e082698 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -141,8 +141,9 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.Stor
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
-import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
+import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
+import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
@@ -5175,7 +5176,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     protected Cell joinedContinuationRow = null;
     protected final byte[] stopRow;
     private final FilterWrapper filter;
-    private int batch;
+    private ScannerContext defaultScannerContext;
     protected int isScan;
     private boolean filterClosed = false;
     private long readPt;
@@ -5198,7 +5199,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         this.filter = null;
       }
 
-      this.batch = scan.getBatch();
+      /**
+       * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default
+       * scanner context that can be used to enforce the batch limit in the event that a
+       * ScannerContext is not specified during an invocation of next/nextRaw
+       */
+      defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build();
+
       if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) {
         this.stopRow = null;
       } else {
@@ -5259,7 +5266,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
     @Override
     public int getBatch() {
-      return this.batch;
+      return this.defaultScannerContext.getBatchLimit();
     }
 
     /**
@@ -5274,19 +5281,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
 
     @Override
-    public NextState next(List<Cell> outResults)
+    public boolean next(List<Cell> outResults)
         throws IOException {
       // apply the batching limit by default
-      return next(outResults, batch);
-    }
-
-    @Override
-    public NextState next(List<Cell> outResults, int limit) throws IOException {
-      return next(outResults, limit, -1);
+      return next(outResults, defaultScannerContext);
     }
 
     @Override
-    public synchronized NextState next(List<Cell> outResults, int limit, long remainingResultSize)
+    public synchronized boolean next(List<Cell> outResults, ScannerContext scannerContext)
         throws IOException {
       if (this.filterClosed) {
         throw new UnknownScannerException("Scanner was closed (timed out?) " +
@@ -5296,122 +5298,107 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       startRegionOperation(Operation.SCAN);
       readRequestsCount.increment();
       try {
-        return nextRaw(outResults, limit, remainingResultSize);
+        return nextRaw(outResults, scannerContext);
       } finally {
         closeRegionOperation(Operation.SCAN);
       }
     }
 
     @Override
-    public NextState nextRaw(List<Cell> outResults) throws IOException {
-      return nextRaw(outResults, batch);
+    public boolean nextRaw(List<Cell> outResults) throws IOException {
+      // Use the RegionScanner's context by default
+      return nextRaw(outResults, defaultScannerContext);
     }
 
     @Override
-    public NextState nextRaw(List<Cell> outResults, int limit)
-        throws IOException {
-      return nextRaw(outResults, limit, -1);
-    }
-
-    @Override
-    public NextState nextRaw(List<Cell> outResults, int batchLimit, long remainingResultSize)
+    public boolean nextRaw(List<Cell> outResults, ScannerContext scannerContext)
         throws IOException {
       if (storeHeap == null) {
         // scanner is closed
         throw new UnknownScannerException("Scanner was closed");
       }
-      NextState state;
+      boolean moreValues;
       if (outResults.isEmpty()) {
         // Usually outResults is empty. This is true when next is called
         // to handle scan or get operation.
-        state = nextInternal(outResults, batchLimit, remainingResultSize);
+        moreValues = nextInternal(outResults, scannerContext);
       } else {
         List<Cell> tmpList = new ArrayList<Cell>();
-        state = nextInternal(tmpList, batchLimit, remainingResultSize);
+        moreValues = nextInternal(tmpList, scannerContext);
         outResults.addAll(tmpList);
       }
-      // Invalid states should never be returned. Receiving an invalid state means that we have
-      // no clue how to proceed. Throw an exception.
-      if (!NextState.isValidState(state)) {
-        throw new IOException("Invalid state returned from nextInternal. state:" + state);
-      }
 
       // If the size limit was reached it means a partial Result is being returned. Returning a
       // partial Result means that we should not reset the filters; filters should only be reset in
       // between rows
-      if (!state.sizeLimitReached()) resetFilters();
+      if (!scannerContext.partialResultFormed()) resetFilters();
 
       if (isFilterDoneInternal()) {
-        state = NextState.makeState(NextState.State.NO_MORE_VALUES, state.getResultSize());
+        moreValues = false;
       }
-      return state;
+      return moreValues;
     }
 
     /**
-     * @return the state the joinedHeap returned on the call to
-     *         {@link KeyValueHeap#next(List, int, long)}
+     * @return true if more cells exist after this batch, false if scanner is done
      */
-    private NextState populateFromJoinedHeap(List<Cell> results, int limit, long resultSize)
+    private boolean populateFromJoinedHeap(List<Cell> results, ScannerContext scannerContext)
             throws IOException {
       assert joinedContinuationRow != null;
-      NextState state =
-          populateResult(results, this.joinedHeap, limit, resultSize,
+      boolean moreValues =
+          populateResult(results, this.joinedHeap, scannerContext,
           joinedContinuationRow.getRowArray(), joinedContinuationRow.getRowOffset(),
           joinedContinuationRow.getRowLength());
-      if (state != null && !state.batchLimitReached() && !state.sizeLimitReached()) {
+
+      if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
         // We are done with this row, reset the continuation.
         joinedContinuationRow = null;
       }
       // As the data is obtained from two independent heaps, we need to
       // ensure that result list is sorted, because Result relies on that.
       Collections.sort(results, comparator);
-      return state;
+      return moreValues;
     }
 
     /**
      * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is
      * reached, or remainingResultSize (if not -1) is reaced
      * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
-     * @param remainingResultSize The remaining space within our result size limit. A negative value
-     *          indicate no limit
-     * @param batchLimit Max amount of KVs to place in result list, -1 means no limit.
+     * @param scannerContext
      * @param currentRow Byte array with key we are fetching.
      * @param offset offset for currentRow
      * @param length length for currentRow
      * @return state of last call to {@link KeyValueHeap#next()}
      */
-    private NextState populateResult(List<Cell> results, KeyValueHeap heap, int batchLimit,
-        long remainingResultSize, byte[] currentRow, int offset, short length) throws IOException {
+    private boolean populateResult(List<Cell> results, KeyValueHeap heap,
+        ScannerContext scannerContext, byte[] currentRow, int offset, short length)
+        throws IOException {
       Cell nextKv;
       boolean moreCellsInRow = false;
-      long accumulatedResultSize = 0;
-      List<Cell> tmpResults = new ArrayList<Cell>();
+      boolean tmpKeepProgress = scannerContext.getKeepProgress();
+      // Scanning between column families and thus the scope is between cells
+      LimitScope limitScope = LimitScope.BETWEEN_CELLS;
       do {
-        int remainingBatchLimit = batchLimit - results.size();
-        NextState heapState =
-            heap.next(tmpResults, remainingBatchLimit, remainingResultSize - accumulatedResultSize);
-        results.addAll(tmpResults);
-        accumulatedResultSize += calculateResultSize(tmpResults, heapState);
-        tmpResults.clear();
-
-        if (batchLimit > 0 && results.size() == batchLimit) {
-          return NextState.makeState(NextState.State.BATCH_LIMIT_REACHED, accumulatedResultSize);
-        }
+        // We want to maintain any progress that is made towards the limits while scanning across
+        // different column families. To do this, we toggle the keep progress flag on during calls
+        // to the StoreScanner to ensure that any progress made thus far is not wiped away.
+        scannerContext.setKeepProgress(true);
+        heap.next(results, scannerContext);
+        scannerContext.setKeepProgress(tmpKeepProgress);
 
         nextKv = heap.peek();
         moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length);
-        boolean sizeLimitReached =
-            remainingResultSize > 0 && accumulatedResultSize >= remainingResultSize;
-        if (moreCellsInRow && sizeLimitReached) {
-          return NextState.makeState(NextState.State.SIZE_LIMIT_REACHED, accumulatedResultSize);
+
+        if (scannerContext.checkBatchLimit(limitScope)) {
+          return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
+        } else if (scannerContext.checkSizeLimit(limitScope)) {
+          ScannerContext.NextState state =
+              moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
+          return scannerContext.setScannerState(state).hasMoreValues();
         }
       } while (moreCellsInRow);
 
-      if (nextKv != null) {
-        return NextState.makeState(NextState.State.MORE_VALUES, accumulatedResultSize);
-      } else {
-        return NextState.makeState(NextState.State.NO_MORE_VALUES, accumulatedResultSize);
-      }
+      return nextKv != null;
     }
 
     /**
@@ -5429,30 +5416,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       return nextKv != null && CellUtil.matchingRow(nextKv, currentRow, offset, length);
     }
 
-    /**
-     * Calculates the size of the results. If the state of the scanner that these results came from
-     * indicates that an estimate of the result size has already been generated, we can skip the
-     * calculation and use that instead.
-     * @param results List of cells we want to calculate size of
-     * @param state The state returned from the scanner that generated these results
-     * @return aggregate size of results
-     */
-    private long calculateResultSize(List<Cell> results, NextState state) {
-      if (results == null || results.isEmpty()) return 0;
-
-      // In general, the state should contain the estimate because the result size used to
-      // determine when the scan has exceeded its size limit. If the estimate is contained in the
-      // state then we can avoid an unnecesasry calculation.
-      if (state != null && state.hasResultSizeEstimate()) return state.getResultSize();
-
-      long size = 0;
-      for (Cell c : results) {
-        size += CellUtil.estimatedHeapSizeOfWithoutTags(c);
-      }
-
-      return size;
-    }
-
     /*
      * @return True if a filter rules the scanner is over, done.
      */
@@ -5465,20 +5428,37 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       return this.filter != null && this.filter.filterAllRemaining();
     }
 
-    private NextState nextInternal(List<Cell> results, int batchLimit, long remainingResultSize)
+    private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
         throws IOException {
       if (!results.isEmpty()) {
         throw new IllegalArgumentException("First parameter should be an empty list");
       }
-      // Estimate of the size (heap size) of the results returned from this method
-      long resultSize = 0;
+      if (scannerContext == null) {
+        throw new IllegalArgumentException("Scanner context cannot be null");
+      }
       RpcCallContext rpcCall = RpcServer.getCurrentCall();
+
+      // Save the initial progress from the Scanner context in these local variables. The progress
+      // may need to be reset a few times if rows are being filtered out so we save the initial
+      // progress.
+      int initialBatchProgress = scannerContext.getBatchProgress();
+      long initialSizeProgress = scannerContext.getSizeProgress();
+
       // The loop here is used only when at some point during the next we determine
       // that due to effects of filters or otherwise, we have an empty row in the result.
       // Then we loop and try again. Otherwise, we must get out on the first iteration via return,
       // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
       // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
       while (true) {
+        // Starting to scan a new row. Reset the scanner progress according to whether or not
+        // progress should be kept.
+        if (scannerContext.getKeepProgress()) {
+          // Progress should be kept. Reset to initial values seen at start of method invocation.
+          scannerContext.setProgress(initialBatchProgress, initialSizeProgress);
+        } else {
+          scannerContext.clearProgress();
+        }
+
         if (rpcCall != null) {
           // If a user specifies a too-restrictive or too-slow scanner, the
           // client might time out and disconnect while the server side
@@ -5506,21 +5486,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         }
 
         boolean stopRow = isStopRow(currentRow, offset, length);
+        // When has filter row is true it means that the all the cells for a particular row must be
+        // read before a filtering decision can be made. This means that filters where hasFilterRow
+        // run the risk of encountering out of memory errors in the case that they are applied to a
+        // table that has very large rows.
         boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow();
 
         // If filter#hasFilterRow is true, partial results are not allowed since allowing them
         // would prevent the filters from being evaluated. Thus, if it is true, change the
-        // remainingResultSize to -1 so that the entire row's worth of cells are fetched.
-        if (hasFilterRow && remainingResultSize > 0) {
-          remainingResultSize = -1;
+        // scope of any limits that could potentially create partial results to
+        // LimitScope.BETWEEN_ROWS so that those limits are not reached mid-row
+        if (hasFilterRow) {
           if (LOG.isTraceEnabled()) {
-            LOG.trace("filter#hasFilterRow is true which prevents partial results from being " +
-                " formed. The remainingResultSize of: " + remainingResultSize + " will not " +
-                " be considered when fetching the cells for this row.");
+            LOG.trace("filter#hasFilterRow is true which prevents partial results from being "
+                + " formed. Changing scope of limits that may create partials");
           }
+          scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
         }
 
-        NextState joinedHeapState;
         // Check if we were getting data from the joinedHeap and hit the limit.
         // If not, then it's main path - getting results from storeHeap.
         if (joinedContinuationRow == null) {
@@ -5529,47 +5512,30 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             if (hasFilterRow) {
               filter.filterRowCells(results);
             }
-            return NextState.makeState(NextState.State.NO_MORE_VALUES, resultSize);
+            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
           }
 
           // Check if rowkey filter wants to exclude this row. If so, loop to next.
           // Technically, if we hit limits before on this row, we don't need this call.
           if (filterRowKey(currentRow, offset, length)) {
             boolean moreRows = nextRow(currentRow, offset, length);
-            if (!moreRows) return NextState.makeState(NextState.State.NO_MORE_VALUES, resultSize);
+            if (!moreRows) {
+              return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
+            }
             results.clear();
             continue;
           }
 
-          NextState storeHeapState =
-              populateResult(results, this.storeHeap, batchLimit, remainingResultSize, currentRow,
-                offset, length);
-          resultSize += calculateResultSize(results, storeHeapState);
-          // Invalid states should never be returned. If one is seen, throw exception
-          // since we have no way of telling how we should proceed
-          if (!NextState.isValidState(storeHeapState)) {
-            throw new IOException("NextState returned from call storeHeap was invalid");
-          }
-
           // Ok, we are good, let's try to get some results from the main heap.
-          if (storeHeapState.batchLimitReached()) {
-            if (hasFilterRow) {
-              throw new IncompatibleFilterException(
-                "Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
-            }
-            // We hit the batch limit.
-            return NextState.makeState(NextState.State.BATCH_LIMIT_REACHED, resultSize);
-          } else if (storeHeapState.sizeLimitReached()) {
+          populateResult(results, this.storeHeap, scannerContext, currentRow, offset, length);
+
+          if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
             if (hasFilterRow) {
-              // We try to guard against this case above when remainingResultSize is set to -1 if
-              // hasFilterRow is true. In the even that the guard doesn't work, an exception must be
-              // thrown
               throw new IncompatibleFilterException(
-                  "Filter whose hasFilterRows() returns true is incompatible with scans that"
-                      + " return partial results");
+                  "Filter whose hasFilterRow() returns true is incompatible with scans that must "
+                      + " stop mid-row because of a limit. ScannerContext:" + scannerContext);
             }
-            // We hit the size limit.
-            return NextState.makeState(NextState.State.SIZE_LIMIT_REACHED, resultSize);
+            return true;
           }
           Cell nextKv = this.storeHeap.peek();
           stopRow = nextKv == null ||
@@ -5582,17 +5548,31 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
           if (hasFilterRow) {
             ret = filter.filterRowCellsWithRet(results);
+
+            // We don't know how the results have changed after being filtered. Must set progress
+            // according to contents of results now.
+            if (scannerContext.getKeepProgress()) {
+              scannerContext.setProgress(initialBatchProgress, initialSizeProgress);
+            } else {
+              scannerContext.clearProgress();
+            }
+            scannerContext.incrementBatchProgress(results.size());
+            for (Cell cell : results) {
+              scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell));
+            }
           }
 
           if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) {
             results.clear();
             boolean moreRows = nextRow(currentRow, offset, length);
-            if (!moreRows) return NextState.makeState(NextState.State.NO_MORE_VALUES, 0);
+            if (!moreRows) {
+              return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
+            }
 
             // This row was totally filtered out, if this is NOT the last row,
             // we should continue on. Otherwise, nothing else to do.
             if (!stopRow) continue;
-            return NextState.makeState(NextState.State.NO_MORE_VALUES, 0);
+            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
           }
 
           // Ok, we are done with storeHeap for this row.
@@ -5610,31 +5590,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
                     currentRow, offset, length));
             if (mayHaveData) {
               joinedContinuationRow = current;
-              joinedHeapState =
-                  populateFromJoinedHeap(results, batchLimit, remainingResultSize - resultSize);
-              resultSize +=
-                  joinedHeapState != null && joinedHeapState.hasResultSizeEstimate() ?
-                      joinedHeapState.getResultSize() : 0;
-              if (joinedHeapState != null && joinedHeapState.sizeLimitReached()) {
-                return NextState.makeState(NextState.State.SIZE_LIMIT_REACHED, resultSize);
+              populateFromJoinedHeap(results, scannerContext);
+
+              if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
+                return true;
               }
             }
           }
         } else {
           // Populating from the joined heap was stopped by limits, populate some more.
-          joinedHeapState =
-              populateFromJoinedHeap(results, batchLimit, remainingResultSize - resultSize);
-          resultSize +=
-              joinedHeapState != null && joinedHeapState.hasResultSizeEstimate() ?
-                  joinedHeapState.getResultSize() : 0;
-          if (joinedHeapState != null && joinedHeapState.sizeLimitReached()) {
-            return NextState.makeState(NextState.State.SIZE_LIMIT_REACHED, resultSize);
+          populateFromJoinedHeap(results, scannerContext);
+          if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
+            return true;
           }
         }
         // We may have just called populateFromJoinedMap and hit the limits. If that is
         // the case, we need to call it again on the next next() invocation.
         if (joinedContinuationRow != null) {
-          return NextState.makeState(NextState.State.MORE_VALUES, resultSize);
+          return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
         }
 
         // Finally, we are done with both joinedHeap and storeHeap.
@@ -5642,15 +5615,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         // the case when SingleColumnValueExcludeFilter is used.
         if (results.isEmpty()) {
           boolean moreRows = nextRow(currentRow, offset, length);
-          if (!moreRows) return NextState.makeState(NextState.State.NO_MORE_VALUES, 0);
+          if (!moreRows) {
+            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
+          }
           if (!stopRow) continue;
         }
 
         // We are done. Return the result.
         if (stopRow) {
-          return NextState.makeState(NextState.State.NO_MORE_VALUES, resultSize);
+          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
         } else {
-          return NextState.makeState(NextState.State.MORE_VALUES, resultSize);
+          return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
         }
       }
     }
@@ -7269,7 +7244,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           boolean done;
           do {
             kvs.clear();
-            done = NextState.hasMoreValues(scanner.next(kvs));
+            done = scanner.next(kvs);
             if (kvs.size() > 0) LOG.info(kvs);
           } while (done);
         } finally {

http://git-wip-us.apache.org/repos/asf/hbase/blob/62d47e17/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
index ea5a75f..f73e363 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
@@ -42,218 +42,21 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 @InterfaceAudience.Private
 public interface InternalScanner extends Closeable {
   /**
-   * This class encapsulates all the meaningful state information that we would like the know about
-   * after a call to {@link InternalScanner#next(List)}. While this is not an enum, a restriction on
-   * the possible states is implied through the exposed {@link #makeState(State)} method.
-   */
-  public static class NextState {
-    /**
-     * The possible states we want to restrict ourselves to. This enum is not sufficient to
-     * encapsulate all of the state information since some of the fields of the state must be
-     * dynamic (e.g. resultSize).
-     */
-    public enum State {
-      MORE_VALUES(true),
-      NO_MORE_VALUES(false),
-      SIZE_LIMIT_REACHED(true),
-      BATCH_LIMIT_REACHED(true);
-
-      private boolean moreValues;
-
-      private State(final boolean moreValues) {
-        this.moreValues = moreValues;
-      }
-
-      /**
-       * @return true when the state indicates that more values may follow those that have been
-       *         returned
-       */
-      public boolean hasMoreValues() {
-        return this.moreValues;
-      }
-    }
-
-    /**
-     * state variables
-     */
-    private final State state;
-    private long resultSize;
-
-    /**
-     * Value to use for resultSize when the size has not been calculated. Must be a negative number
-     * so that {@link NextState#hasResultSizeEstimate()} returns false.
-     */
-    private static final long DEFAULT_RESULT_SIZE = -1;
-
-    private NextState(State state, long resultSize) {
-      this.state = state;
-      this.resultSize = resultSize;
-    }
-
-    /**
-     * @param state
-     * @return An instance of {@link NextState} where the size of the results returned from the call
-     *         to {@link InternalScanner#next(List)} is unknown. It it the responsibility of the
-     *         caller of {@link InternalScanner#next(List)} to calculate the result size if needed
-     */
-    public static NextState makeState(final State state) {
-      return makeState(state, DEFAULT_RESULT_SIZE);
-    }
-
-    /**
-     * @param state
-     * @param resultSize
-     * @return An instance of {@link NextState} where the size of the values returned from the call
-     *         to {@link InternalScanner#next(List)} is known. The caller can avoid recalculating
-     *         the result size by using the cached value retrievable via {@link #getResultSize()}
-     */
-    public static NextState makeState(final State state, long resultSize) {
-      switch (state) {
-      case MORE_VALUES:
-        return createMoreValuesState(resultSize);
-      case NO_MORE_VALUES:
-        return createNoMoreValuesState(resultSize);
-      case BATCH_LIMIT_REACHED:
-        return createBatchLimitReachedState(resultSize);
-      case SIZE_LIMIT_REACHED:
-        return createSizeLimitReachedState(resultSize);
-      default:
-        // If the state is not recognized, default to no more value state
-        return createNoMoreValuesState(resultSize);
-      }
-    }
-
-    /**
-     * Convenience method for creating a state that indicates that more values can be scanned
-     * @param resultSize estimate of the size (heap size) of the values returned from the call to
-     *          {@link InternalScanner#next(List)}
-     */
-    private static NextState createMoreValuesState(long resultSize) {
-      return new NextState(State.MORE_VALUES, resultSize);
-    }
-
-    /**
-     * Convenience method for creating a state that indicates that no more values can be scanned.
-     * @param resultSize estimate of the size (heap size) of the values returned from the call to
-     *          {@link InternalScanner#next(List)}
-     */
-    private static NextState createNoMoreValuesState(long resultSize) {
-      return new NextState(State.NO_MORE_VALUES, resultSize);
-    }
-
-    /**
-     * Convenience method for creating a state that indicates that the scan stopped because the
-     * batch limit was exceeded
-     * @param resultSize estimate of the size (heap size) of the values returned from the call to
-     *          {@link InternalScanner#next(List)}
-     */
-    private static NextState createBatchLimitReachedState(long resultSize) {
-      return new NextState(State.BATCH_LIMIT_REACHED, resultSize);
-    }
-
-    /**
-     * Convenience method for creating a state that indicates that the scan stopped due to the size
-     * limit
-     * @param resultSize estimate of the size (heap size) of the values returned from the call to
-     *          {@link InternalScanner#next(List)}
-     */
-    private static NextState createSizeLimitReachedState(long resultSize) {
-      return new NextState(State.SIZE_LIMIT_REACHED, resultSize);
-    }
-
-    /**
-     * @return true when the scanner has more values to be scanned following the values returned by
-     *         the call to {@link InternalScanner#next(List)}
-     */
-    public boolean hasMoreValues() {
-      return this.state.hasMoreValues();
-    }
-
-    /**
-     * @return true when the scanner had to stop scanning because it reached the batch limit
-     */
-    public boolean batchLimitReached() {
-      return this.state == State.BATCH_LIMIT_REACHED;
-    }
-
-    /**
-     * @return true when the scanner had to stop scanning because it reached the size limit
-     */
-    public boolean sizeLimitReached() {
-      return this.state == State.SIZE_LIMIT_REACHED;
-    }
-
-    /**
-     * @return The size (heap size) of the values that were returned from the call to
-     *         {@link InternalScanner#next(List)}. This value should only be used if
-     *         {@link #hasResultSizeEstimate()} returns true.
-     */
-    public long getResultSize() {
-      return resultSize;
-    }
-
-    /**
-     * @return true when an estimate for the size of the values returned by
-     *         {@link InternalScanner#next(List)} was provided. If false, it is the responsibility
-     *         of the caller to calculate the result size
-     */
-    public boolean hasResultSizeEstimate() {
-      return resultSize >= 0;
-    }
-
-    @Override
-    public String toString() {
-      return "State: " + state + " resultSize: " + resultSize;
-    }
-
-    /**
-     * Helper method to centralize all checks as to whether or not the state is valid.
-     * @param state
-     * @return true when the state is valid
-     */
-    public static boolean isValidState(NextState state) {
-      return state != null;
-    }
-
-    /**
-     * @param state
-     * @return true when the state is non null and indicates that more values exist
-     */
-    public static boolean hasMoreValues(NextState state) {
-      return state != null && state.hasMoreValues();
-    }
-  }
-
-  /**
    * Grab the next row's worth of values.
    * @param results return output array
-   * @return state where {@link NextState#hasMoreValues()} is true if more rows exist after this
-   *         one, false if scanner is done
+   * @return true if more rows exist after this one, false if scanner is done
    * @throws IOException e
    */
-  NextState next(List<Cell> results) throws IOException;
+  boolean next(List<Cell> results) throws IOException;
 
   /**
-   * Grab the next row's worth of values with a limit on the number of values to return.
-   * @param result return output array
-   * @param limit limit on row count to get
-   * @return state where {@link NextState#hasMoreValues()} is true if more rows exist after this
-   *         one, false if scanner is done
-   * @throws IOException e
-   */
-  NextState next(List<Cell> result, int limit) throws IOException;
-
-  /**
-   * Grab the next row's worth of values with a limit on the number of values to return as well as a
-   * restriction on the size of the list of values that are returned.
+   * Grab the next row's worth of values.
    * @param result return output array
-   * @param limit limit on row count to get
-   * @param remainingResultSize limit on the size of the result being returned
-   * @return state where {@link NextState#hasMoreValues()} is true if more rows exist after this
-   *         one, false if scanner is done
+   * @param scannerContext
+   * @return true if more rows exist after this one, false if scanner is done
    * @throws IOException e
    */
-  NextState next(List<Cell> result, int limit, long remainingResultSize) throws IOException;
+  boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException;
 
   /**
    * Closes the scanner and releases any resources it has allocated

http://git-wip-us.apache.org/repos/asf/hbase/blob/62d47e17/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
index beb23cf..761267f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
@@ -27,6 +27,7 @@ import java.util.PriorityQueue;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue.KVComparator;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
 
 /**
  * Implements a heap merge across any number of KeyValueScanners.
@@ -128,26 +129,20 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
    * This can ONLY be called when you are using Scanners that implement InternalScanner as well as
    * KeyValueScanner (a {@link StoreScanner}).
    * @param result
-   * @param limit
-   * @return state where NextState#hasMoreValues() is true if more keys exist after this
-   *         one, false if scanner is done
+   * @return true if more rows exist after this one, false if scanner is done
    */
-  public NextState next(List<Cell> result, int limit) throws IOException {
-    return next(result, limit, -1);
+  @Override
+  public boolean next(List<Cell> result) throws IOException {
+    return next(result, NoLimitScannerContext.getInstance());
   }
 
-  public NextState next(List<Cell> result, int limit, long remainingResultSize) throws IOException {
+  @Override
+  public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
     if (this.current == null) {
-      return NextState.makeState(NextState.State.NO_MORE_VALUES);
+      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
     }
     InternalScanner currentAsInternal = (InternalScanner)this.current;
-    NextState state = currentAsInternal.next(result, limit, remainingResultSize);
-    // Invalid states should never be returned. Receiving an invalid state means that we have
-    // no clue how to proceed. Throw an exception.
-    if (!NextState.isValidState(state)) {
-      throw new IOException("Invalid state returned from InternalScanner#next");
-    }
-    boolean mayContainMoreRows = NextState.hasMoreValues(state);
+    boolean moreCells = currentAsInternal.next(result, scannerContext);
     Cell pee = this.current.peek();
     /*
      * By definition, any InternalScanner must return false only when it has no
@@ -156,31 +151,16 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
      * more efficient to close scanners which are not needed than keep them in
      * the heap. This is also required for certain optimizations.
      */
-    if (pee == null || !mayContainMoreRows) {
+    if (pee == null || !moreCells) {
       this.current.close();
     } else {
       this.heap.add(this.current);
     }
     this.current = pollRealKV();
     if (this.current == null) {
-      state = NextState.makeState(NextState.State.NO_MORE_VALUES, state.getResultSize());
+      moreCells = scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
     }
-    return state;
-  }
-
-  /**
-   * Gets the next row of keys from the top-most scanner.
-   * <p>
-   * This method takes care of updating the heap.
-   * <p>
-   * This can ONLY be called when you are using Scanners that implement InternalScanner as well as
-   * KeyValueScanner (a {@link StoreScanner}).
-   * @param result
-   * @return state where NextState#hasMoreValues() is true if more keys exist after this
-   *         one, false if scanner is done
-   */
-  public NextState next(List<Cell> result) throws IOException {
-    return next(result, -1);
+    return moreCells;
   }
 
   protected static class KVScannerComparator implements Comparator<KeyValueScanner> {

http://git-wip-us.apache.org/repos/asf/hbase/blob/62d47e17/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java
new file mode 100644
index 0000000..1484e80
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * This is a special {@link ScannerContext} subclass that is designed to be used globally when
+ * limits should not be enforced during invocations of {@link InternalScanner#next(java.util.List)}
+ * or {@link RegionScanner#next(java.util.List)}.
+ * <p>
+ * Instances of {@link NoLimitScannerContext} are immutable after construction. Any attempt to
+ * change the limits or progress of a {@link NoLimitScannerContext} will fail silently. The net
+ * effect is that all limit checks will return false, thus indicating that a limit has not been
+ * reached.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
+@InterfaceStability.Evolving
+public class NoLimitScannerContext extends ScannerContext {
+
+  public NoLimitScannerContext() {
+    super(false, null);
+  }
+
+  /**
+   * Use this instance whenever limits do not need to be enforced.
+   */
+  private static final ScannerContext NO_LIMIT = new NoLimitScannerContext();
+
+  /**
+   * @return The static, immutable instance of {@link NoLimitScannerContext} to be used whenever
+   *         limits should not be enforced
+   */
+  public static final ScannerContext getInstance() {
+    return NO_LIMIT;
+  }
+
+  @Override
+  void setKeepProgress(boolean keepProgress) {
+    // Do nothing. NoLimitScannerContext instances are immutable post-construction
+  }
+
+  @Override
+  void setBatchProgress(int batchProgress) {
+    // Do nothing. NoLimitScannerContext instances are immutable post-construction
+  }
+
+  @Override
+  void setSizeProgress(long sizeProgress) {
+    // Do nothing. NoLimitScannerContext instances are immutable post-construction
+  }
+
+  @Override
+  void setProgress(int batchProgress, long sizeProgress) {
+    // Do nothing. NoLimitScannerContext instances are immutable post-construction
+  }
+
+  @Override
+  void setSizeLimitScope(LimitScope scope) {
+    // Do nothing. NoLimitScannerContext instances are immutable post-construction
+  }
+
+  @Override
+  NextState setScannerState(NextState state) {
+    // Do nothing. NoLimitScannerContext instances are immutable post-construction
+    return state;
+  }
+
+  @Override
+  boolean checkBatchLimit(LimitScope checkerScope) {
+    // No limits can be specified, thus return false to indicate no limit has been reached.
+    return false;
+  }
+
+  @Override
+  boolean checkSizeLimit(LimitScope checkerScope) {
+    // No limits can be specified, thus return false to indicate no limit has been reached.
+    return false;
+  }
+
+  @Override
+  boolean checkAnyLimitReached(LimitScope checkerScope) {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/62d47e17/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 1508a15..10e39a1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -105,8 +105,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionResponse;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
@@ -120,6 +118,8 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfiguratio
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
@@ -151,10 +151,10 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
 import org.apache.hadoop.hbase.quotas.OperationQuota;
 import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
-import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
 import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
 import org.apache.hadoop.hbase.regionserver.Region.Operation;
+import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
 import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -2236,61 +2236,53 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
                 // correct ordering of partial results and so we prevent partial results from being
                 // formed.
                 boolean serverGuaranteesOrderOfPartials = currentScanResultSize == 0;
-                boolean enforceMaxResultSizeAtCellLevel =
+                boolean allowPartialResults =
                     clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan;
-                NextState state = null;
+                boolean moreRows = false;
+
+                final LimitScope sizeScope =
+                    allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
+
+                // Configure with limits for this RPC. Set keep progress true since size progress
+                // towards size limit should be kept between calls to nextRaw
+                ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
+                contextBuilder.setSizeLimit(sizeScope, maxResultSize);
+                contextBuilder.setBatchLimit(scanner.getBatch());
+                ScannerContext scannerContext = contextBuilder.build();
 
                 while (i < rows) {
                   // Stop collecting results if we have exceeded maxResultSize
-                  if (currentScanResultSize >= maxResultSize) {
+                  if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS)) {
                     builder.setMoreResultsInRegion(true);
                     break;
                   }
 
-                  // A negative remainingResultSize communicates that there is no limit on the size
-                  // of the results.
-                  final long remainingResultSize =
-                      enforceMaxResultSizeAtCellLevel ? maxResultSize - currentScanResultSize
-                          : -1;
+                  // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
+                  // batch limit is a limit on the number of cells per Result. Thus, if progress is
+                  // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
+                  // reset the batch progress between nextRaw invocations since we don't want the
+                  // batch progress from previous calls to affect future calls
+                  scannerContext.setBatchProgress(0);
 
                   // Collect values to be returned here
-                  state = scanner.nextRaw(values, scanner.getBatch(), remainingResultSize);
-                  // Invalid states should never be returned. If one is seen, throw exception
-                  // to stop the scan -- We have no way of telling how we should proceed
-                  if (!NextState.isValidState(state)) {
-                    throw new IOException("NextState returned from call to nextRaw was invalid");
-                  }
-                  if (!values.isEmpty()) {
-                    // The state should always contain an estimate of the result size because that
-                    // estimate must be used to decide when partial results are formed.
-                    boolean skipResultSizeCalculation = state.hasResultSizeEstimate();
-                    if (skipResultSizeCalculation) currentScanResultSize += state.getResultSize();
+                  moreRows = scanner.nextRaw(values, scannerContext);
 
+                  if (!values.isEmpty()) {
                     for (Cell cell : values) {
                       totalCellSize += CellUtil.estimatedSerializedSizeOf(cell);
-
-                      // If the calculation can't be skipped, then do it now.
-                      if (!skipResultSizeCalculation) {
-                        currentScanResultSize += CellUtil.estimatedHeapSizeOfWithoutTags(cell);
-                      }
                     }
-                    // The size limit was reached. This means there are more cells remaining in
-                    // the row but we had to stop because we exceeded our max result size. This
-                    // indicates that we are returning a partial result
-                    final boolean partial = state != null && state.sizeLimitReached();
+                    final boolean partial = scannerContext.partialResultFormed();
                     results.add(Result.create(values, null, stale, partial));
                     i++;
                   }
-                  if (!NextState.hasMoreValues(state)) {
+                  if (!moreRows) {
                     break;
                   }
                   values.clear();
                 }
-                // currentScanResultSize >= maxResultSize should be functionally equivalent to
-                // state.sizeLimitReached()
-                if (null != state
-                    && (currentScanResultSize >= maxResultSize || i >= rows || state
-                        .hasMoreValues())) {
+
+                if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS) || i >= rows || 
+                    moreRows) {
                   // We stopped prematurely
                   builder.setMoreResultsInRegion(true);
                 } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/62d47e17/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
index 26f9aef..66e087b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
  * RegionScanner describes iterators over rows in an HRegion.
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
-@InterfaceStability.Stable
+@InterfaceStability.Evolving
 public interface RegionScanner extends InternalScanner {
   /**
    * @return The RegionInfo for this scanner.
@@ -74,35 +74,22 @@ public interface RegionScanner extends InternalScanner {
   int getBatch();
 
   /**
-   * Grab the next row's worth of values with the default limit on the number of values to return.
-   * This is a special internal method to be called from coprocessor hooks to avoid expensive setup.
-   * Caller must set the thread's readpoint, start and close a region operation, an synchronize on
-   * the scanner object. Caller should maintain and update metrics. See
-   * {@link #nextRaw(List, int, long)}
-   * @param result return output array
-   * @return a state where NextState#hasMoreValues() is true when more rows exist, false when
-   *         scanner is done.
-   * @throws IOException e
-   */
-  NextState nextRaw(List<Cell> result) throws IOException;
-
-  /**
-   * Grab the next row's worth of values with the default limit on the number of values to return.
-   * This is a special internal method to be called from coprocessor hooks to avoid expensive setup.
-   * Caller must set the thread's readpoint, start and close a region operation, an synchronize on
-   * the scanner object. Caller should maintain and update metrics. See
-   * {@link #nextRaw(List, int, long)}
+   * Grab the next row's worth of values. This is a special internal method to be called from
+   * coprocessor hooks to avoid expensive setup. Caller must set the thread's readpoint, start and
+   * close a region operation, an synchronize on the scanner object. Caller should maintain and
+   * update metrics. See {@link #nextRaw(List, ScannerContext)}
    * @param result return output array
-   * @param limit limit on row count to get
-   * @return a state where NextState#hasMoreValues() is true when more rows exist, false when
-   *         scanner is done.
+   * @return true if more rows exist after this one, false if scanner is done
    * @throws IOException e
    */
-  NextState nextRaw(List<Cell> result, int limit) throws IOException;
-
+  boolean nextRaw(List<Cell> result) throws IOException;
+  
   /**
-   * Grab the next row's worth of values with a limit on the number of values to return as well as a
-   * limit on the heap size of those values. This is a special internal method to be called from
+   * Grab the next row's worth of values. The {@link ScannerContext} is used to enforce and track
+   * any limits associated with this call. Any progress that exists in the {@link ScannerContext}
+   * prior to calling this method will be LOST if {@link ScannerContext#getKeepProgress()} is false.
+   * Upon returning from this method, the {@link ScannerContext} will contain information about the
+   * progress made towards the limits. This is a special internal method to be called from
    * coprocessor hooks to avoid expensive setup. Caller must set the thread's readpoint, start and
    * close a region operation, an synchronize on the scanner object. Example: <code><pre>
    * HRegion region = ...;
@@ -120,13 +107,12 @@ public interface RegionScanner extends InternalScanner {
    * }
    * </pre></code>
    * @param result return output array
-   * @param limit limit on row count to get
-   * @param remainingResultSize the space remaining within the restriction on the result size.
-   *          Negative values indicate no limit
-   * @return a state where NextState#hasMoreValues() is true when more rows exist, false when
-   *         scanner is done.
+   * @param scannerContext The {@link ScannerContext} instance encapsulating all limits that should
+   *          be tracked during calls to this method. The progress towards these limits can be
+   *          tracked within this instance.
+   * @return true if more rows exist after this one, false if scanner is done
    * @throws IOException e
    */
-  NextState nextRaw(List<Cell> result, int limit, final long remainingResultSize)
+  boolean nextRaw(List<Cell> result, ScannerContext scannerContext)
       throws IOException;
 }