You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2015/04/08 17:17:17 UTC

[2/3] hbase git commit: HBASE-11544 [Ergonomics] hbase.client.scanner.caching is dogged and will try to return batch even if it means OOME Added in some check-style fixes to bring us back under the limit

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ba621e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
new file mode 100644
index 0000000..6e487ca
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
@@ -0,0 +1,527 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * ScannerContext instances encapsulate limit tracking AND progress towards those limits during
+ * invocations of {@link InternalScanner#next(java.util.List)} and
+ * {@link RegionScanner#next(java.util.List)}.
+ * <p>
+ * A ScannerContext instance should be updated periodically throughout execution whenever progress
+ * towards a limit has been made. Each limit can be checked via the appropriate checkLimit method.
+ * <p>
+ * Once a limit has been reached, the scan will stop. The invoker of
+ * {@link InternalScanner#next(java.util.List)} or {@link RegionScanner#next(java.util.List)} can
+ * use the appropriate check*Limit methods to see exactly which limits have been reached.
+ * Alternatively, {@link #checkAnyLimitReached(LimitScope)} is provided to see if ANY limit was
+ * reached
+ * <p>
+ * {@link NoLimitScannerContext#NO_LIMIT} is an immutable static definition that can be used
+ * whenever a {@link ScannerContext} is needed but limits do not need to be enforced.
+ * <p>
+ * NOTE: It is important that this class only ever expose setter methods that can be safely skipped
+ * when limits should be NOT enforced. This is because of the necessary immutability of the class
+ * {@link NoLimitScannerContext}. If a setter cannot be safely skipped, the immutable nature of
+ * {@link NoLimitScannerContext} will lead to incorrect behavior.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
+@InterfaceStability.Evolving
+public class ScannerContext {
+  private final Log LOG = LogFactory.getLog(this.getClass());
+
+  /**
+   * Two sets of the same fields. One for the limits, another for the progress towards those limits
+   */
+  LimitFields limits;
+  LimitFields progress;
+
+  /**
+   * The state of the scanner after the invocation of {@link InternalScanner#next(java.util.List)}
+   * or {@link RegionScanner#next(java.util.List)}.
+   */
+  NextState scannerState;
+  private static final NextState DEFAULT_STATE = NextState.MORE_VALUES;
+
+  /**
+   * Used as an indication to invocations of {@link InternalScanner#next(java.util.List)} and
+   * {@link RegionScanner#next(java.util.List)} that, if true, the progress tracked within this
+   * {@link ScannerContext} instance should be considered while evaluating the limits. Useful for
+   * enforcing a set of limits across multiple calls (i.e. the limit may not be reached in a single
+   * invocation, but any progress made should be considered in future invocations)
+   * <p>
+   * Defaulting this value to false means that, by default, any tracked progress will be wiped clean
+   * on invocations to {@link InternalScanner#next(java.util.List)} and
+   * {@link RegionScanner#next(java.util.List)} and the call will be treated as though no progress
+   * has been made towards the limits so far.
+   * <p>
+   * This is an important mechanism. Users of Internal/Region scanners expect that they can define
+   * some limits and then repeatedly invoke {@link InternalScanner#next(List)} or
+   * {@link RegionScanner#next(List)} where each invocation respects these limits separately.
+   * <p>
+   * For example: <code><pre>
+   * ScannerContext context = new ScannerContext.newBuilder().setBatchLimit(5).build();
+   * RegionScanner scanner = ...
+   * List<Cell> results = new ArrayList<Cell>();
+   * while(scanner.next(results, context)) {
+   *   // Do something with a batch of 5 cells
+   * }
+   * </pre></code> However, in the case of RPCs, the server wants to be able to define a set of
+   * limits for a particular RPC request and have those limits respected across multiple
+   * invocations. This means that the progress made towards the limits in earlier calls will be
+   * saved and considered in future invocations
+   */
+  boolean keepProgress;
+  private static boolean DEFAULT_KEEP_PROGRESS = false;
+
+  ScannerContext(boolean keepProgress, LimitFields limitsToCopy) {
+    this.limits = new LimitFields();
+    if (limitsToCopy != null) this.limits.copy(limitsToCopy);
+
+    // Progress fields are initialized to 0
+    progress = new LimitFields(0, LimitFields.DEFAULT_SCOPE, 0);
+
+    this.keepProgress = keepProgress;
+    this.scannerState = DEFAULT_STATE;
+  }
+
+  /**
+   * @return true if the progress tracked so far in this instance will be considered during an
+   *         invocation of {@link InternalScanner#next(java.util.List)} or
+   *         {@link RegionScanner#next(java.util.List)}. false when the progress tracked so far
+   *         should not be considered and should instead be wiped away via {@link #clearProgress()}
+   */
+  boolean getKeepProgress() {
+    return keepProgress;
+  }
+
+  void setKeepProgress(boolean keepProgress) {
+    this.keepProgress = keepProgress;
+  }
+
+  /**
+   * Progress towards the batch limit has been made. Increment internal tracking of batch progress
+   */
+  void incrementBatchProgress(int batch) {
+    int currentBatch = progress.getBatch();
+    progress.setBatch(currentBatch + batch);
+  }
+
+  /**
+   * Progress towards the size limit has been made. Increment internal tracking of size progress
+   */
+  void incrementSizeProgress(long size) {
+    long currentSize = progress.getSize();
+    progress.setSize(currentSize + size);
+  }
+
+  int getBatchProgress() {
+    return progress.getBatch();
+  }
+
+  long getSizeProgress() {
+    return progress.getSize();
+  }
+
+  void setProgress(int batchProgress, long sizeProgress) {
+    setBatchProgress(batchProgress);
+    setSizeProgress(sizeProgress);
+  }
+
+  void setSizeProgress(long sizeProgress) {
+    progress.setSize(sizeProgress);
+  }
+
+  void setBatchProgress(int batchProgress) {
+    progress.setBatch(batchProgress);
+  }
+
+  /**
+   * Clear away any progress that has been made so far. All progress fields are reset to initial
+   * values
+   */
+  void clearProgress() {
+    progress.setFields(0, LimitFields.DEFAULT_SCOPE, 0);
+  }
+
+  /**
+   * Note that this is not a typical setter. This setter returns the {@link NextState} that was
+   * passed in so that methods can be invoked against the new state. Furthermore, this pattern
+   * allows the {@link NoLimitScannerContext} to cleanly override this setter and simply return the
+   * new state, thus preserving the immutability of {@link NoLimitScannerContext}
+   * @param state
+   * @return The state that
+   */
+  NextState setScannerState(NextState state) {
+    if (!NextState.isValidState(state)) {
+      throw new IllegalArgumentException("Cannot set to invalid state: " + state);
+    }
+
+    this.scannerState = state;
+    return state;
+  }
+
+  /**
+   * @return true when a partial result is formed. A partial result is formed when a limit is
+   *         reached in the middle of a row.
+   */
+  boolean partialResultFormed() {
+    return scannerState == NextState.SIZE_LIMIT_REACHED_MID_ROW;
+  }
+
+  /**
+   * @param checkerScope
+   * @return true if the batch limit can be enforced in the checker's scope
+   */
+  boolean hasBatchLimit(LimitScope checkerScope) {
+    return limits.canEnforceBatchLimitFromScope(checkerScope) && limits.getBatch() > 0;
+  }
+
+  /**
+   * @param checkerScope
+   * @return true if the size limit can be enforced in the checker's scope
+   */
+  boolean hasSizeLimit(LimitScope checkerScope) {
+    return limits.canEnforceSizeLimitFromScope(checkerScope) && limits.getSize() > 0;
+  }
+
+  /**
+   * @param checkerScope
+   * @return true if any limit can be enforced within the checker's scope
+   */
+  boolean hasAnyLimit(LimitScope checkerScope) {
+    return hasBatchLimit(checkerScope) || hasSizeLimit(checkerScope);
+  }
+
+  /**
+   * @param scope The scope in which the size limit will be enforced
+   */
+  void setSizeLimitScope(LimitScope scope) {
+    limits.setSizeScope(scope);
+  }
+
+  int getBatchLimit() {
+    return limits.getBatch();
+  }
+
+  long getSizeLimit() {
+    return limits.getSize();
+  }
+
+  /**
+   * @param checkerScope The scope that the limit is being checked from
+   * @return true when the limit is enforceable from the checker's scope and it has been reached
+   */
+  boolean checkBatchLimit(LimitScope checkerScope) {
+    return hasBatchLimit(checkerScope) && progress.getBatch() >= limits.getBatch();
+  }
+
+  /**
+   * @param checkerScope The scope that the limit is being checked from
+   * @return true when the limit is enforceable from the checker's scope and it has been reached
+   */
+  boolean checkSizeLimit(LimitScope checkerScope) {
+    return hasSizeLimit(checkerScope) && progress.getSize() >= limits.getSize();
+  }
+
+  /**
+   * @param checkerScope The scope that the limits are being checked from
+   * @return true when some limit is enforceable from the checker's scope and it has been reached
+   */
+  boolean checkAnyLimitReached(LimitScope checkerScope) {
+    return checkSizeLimit(checkerScope) || checkBatchLimit(checkerScope);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("{");
+
+    sb.append("limits:");
+    sb.append(limits);
+
+    sb.append(", progress:");
+    sb.append(progress);
+
+    sb.append(", keepProgress:");
+    sb.append(keepProgress);
+
+    sb.append(", state:");
+    sb.append(scannerState);
+
+    sb.append("}");
+    return sb.toString();
+  }
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  public static Builder newBuilder(boolean keepProgress) {
+    return new Builder(keepProgress);
+  }
+
+  public static final class Builder {
+    boolean keepProgress = DEFAULT_KEEP_PROGRESS;
+    LimitFields limits = new LimitFields();
+
+    private Builder() {
+    }
+
+    private Builder(boolean keepProgress) {
+      this.keepProgress = keepProgress;
+    }
+
+    public Builder setKeepProgress(boolean keepProgress) {
+      this.keepProgress = keepProgress;
+      return this;
+    }
+
+    public Builder setSizeLimit(LimitScope sizeScope, long sizeLimit) {
+      limits.setSize(sizeLimit);
+      limits.setSizeScope(sizeScope);
+      return this;
+    }
+
+    public Builder setBatchLimit(int batchLimit) {
+      limits.setBatch(batchLimit);
+      return this;
+    }
+
+    public ScannerContext build() {
+      return new ScannerContext(keepProgress, limits);
+    }
+  }
+
+  /**
+   * The possible states a scanner may be in following a call to {@link InternalScanner#next(List)}
+   */
+  public enum NextState {
+    MORE_VALUES(true, false),
+    NO_MORE_VALUES(false, false),
+    SIZE_LIMIT_REACHED(true, true),
+
+    /**
+     * Special case of size limit reached to indicate that the size limit was reached in the middle
+     * of a row and thus a partial results was formed
+     */
+    SIZE_LIMIT_REACHED_MID_ROW(true, true),
+    BATCH_LIMIT_REACHED(true, true);
+
+    private boolean moreValues;
+    private boolean limitReached;
+
+    private NextState(boolean moreValues, boolean limitReached) {
+      this.moreValues = moreValues;
+      this.limitReached = limitReached;
+    }
+
+    /**
+     * @return true when the state indicates that more values may follow those that have been
+     *         returned
+     */
+    public boolean hasMoreValues() {
+      return this.moreValues;
+    }
+
+    /**
+     * @return true when the state indicates that a limit has been reached and scan should stop
+     */
+    public boolean limitReached() {
+      return this.limitReached;
+    }
+
+    public static boolean isValidState(NextState state) {
+      return state != null;
+    }
+
+    public static boolean hasMoreValues(NextState state) {
+      return isValidState(state) && state.hasMoreValues();
+    }
+  }
+
+  /**
+   * The various scopes where a limit can be enforced. Used to differentiate when a limit should be
+   * enforced or not.
+   */
+  public enum LimitScope {
+    /**
+     * Enforcing a limit between rows means that the limit will not be considered until all the
+     * cells for a particular row have been retrieved
+     */
+    BETWEEN_ROWS(0),
+
+    /**
+     * Enforcing a limit between cells means that the limit will be considered after each full cell
+     * has been retrieved
+     */
+    BETWEEN_CELLS(1);
+
+    /**
+     * When enforcing a limit, we must check that the scope is appropriate for enforcement.
+     * <p>
+     * To communicate this concept, each scope has a depth. A limit will be enforced if the depth of
+     * the checker's scope is less than or equal to the limit's scope. This means that when checking
+     * limits, the checker must know their own scope (i.e. are they checking the limits between
+     * rows, between cells, etc...)
+     */
+    int depth;
+
+    LimitScope(int depth) {
+      this.depth = depth;
+    }
+
+    int depth() {
+      return depth;
+    }
+
+    /**
+     * @param checkerScope The scope in which the limit is being checked
+     * @return true when the checker is in a scope that indicates the limit can be enforced. Limits
+     *         can be enforced from "higher or equal" scopes (i.e. the checker's scope is at a
+     *         lesser depth than the limit)
+     */
+    boolean canEnforceLimitFromScope(LimitScope checkerScope) {
+      return checkerScope != null && checkerScope.depth() <= depth;
+    }
+  }
+
+  /**
+   * The different fields that can be used as limits in calls to
+   * {@link InternalScanner#next(java.util.List)} and {@link RegionScanner#next(java.util.List)}
+   */
+  private static class LimitFields {
+    /**
+     * Default values of the limit fields. Defined such that if a field does NOT change from its
+     * default, it will not be enforced
+     */
+    private static int DEFAULT_BATCH = -1;
+    private static long DEFAULT_SIZE = -1L;
+
+    /**
+     * Default scope that is assigned to a limit if a scope is not specified.
+     */
+    private static final LimitScope DEFAULT_SCOPE = LimitScope.BETWEEN_ROWS;
+
+    // The batch limit will always be enforced between cells, thus, there isn't a field to hold the
+    // batch scope
+    int batch = DEFAULT_BATCH;
+
+    LimitScope sizeScope = DEFAULT_SCOPE;
+    long size = DEFAULT_SIZE;
+
+    /**
+     * Fields keep their default values.
+     */
+    LimitFields() {
+    }
+
+    LimitFields(int batch, LimitScope sizeScope, long size) {
+      setFields(batch, sizeScope, size);
+    }
+
+    void copy(LimitFields limitsToCopy) {
+      if (limitsToCopy != null) {
+        setFields(limitsToCopy.getBatch(), limitsToCopy.getSizeScope(), limitsToCopy.getSize());
+      }
+    }
+
+    /**
+     * Set all fields together.
+     * @param batch
+     * @param sizeScope
+     * @param size
+     */
+    void setFields(int batch, LimitScope sizeScope, long size) {
+      setBatch(batch);
+      setSizeScope(sizeScope);
+      setSize(size);
+    }
+
+    int getBatch() {
+      return this.batch;
+    }
+
+    void setBatch(int batch) {
+      this.batch = batch;
+    }
+
+    /**
+     * @param checkerScope
+     * @return true when the limit can be enforced from the scope of the checker
+     */
+    boolean canEnforceBatchLimitFromScope(LimitScope checkerScope) {
+      return LimitScope.BETWEEN_CELLS.canEnforceLimitFromScope(checkerScope);
+    }
+
+    long getSize() {
+      return this.size;
+    }
+
+    void setSize(long size) {
+      this.size = size;
+    }
+
+    /**
+     * @return {@link LimitScope} indicating scope in which the size limit is enforced
+     */
+    LimitScope getSizeScope() {
+      return this.sizeScope;
+    }
+
+    /**
+     * Change the scope in which the size limit is enforced
+     */
+    void setSizeScope(LimitScope scope) {
+      this.sizeScope = scope;
+    }
+
+    /**
+     * @param checkerScope
+     * @return true when the limit can be enforced from the scope of the checker
+     */
+    boolean canEnforceSizeLimitFromScope(LimitScope checkerScope) {
+      return this.sizeScope.canEnforceLimitFromScope(checkerScope);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("{");
+
+      sb.append("batch:");
+      sb.append(batch);
+
+      sb.append(", size:");
+      sb.append(size);
+
+      sb.append(", sizeScope:");
+      sb.append(sizeScope);
+
+      sb.append("}");
+      return sb.toString();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ba621e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
index 831673d..bcc0a90 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
-import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
 
 /**
@@ -110,10 +109,14 @@ abstract class StoreFlusher {
       Compactor.CellSink sink, long smallestReadPoint) throws IOException {
     int compactionKVMax =
       conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
+
+    ScannerContext scannerContext =
+        ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
+
     List<Cell> kvs = new ArrayList<Cell>();
     boolean hasMore;
     do {
-      hasMore = NextState.hasMoreValues(scanner.next(kvs, compactionKVMax));
+      hasMore = scanner.next(kvs, scannerContext);
       if (!kvs.isEmpty()) {
         for (Cell c : kvs) {
           // If we know that this KV is going to be included always, then let us

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ba621e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 298d5bc..2cc7c96 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -43,6 +43,8 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
+import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
+import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
 import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -442,45 +444,39 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     }
   }
 
-  /**
-   * Get the next row of values from this Store.
-   * @param outResult
-   * @param limit
-   * @return true if there are more rows, false if scanner is done
-   */
   @Override
-  public NextState next(List<Cell> outResult, int limit) throws IOException {
-    // -1 means no limit
-    return next(outResult, limit, -1);
+  public boolean next(List<Cell> outResult) throws IOException {
+    return next(outResult, NoLimitScannerContext.NO_LIMIT);
   }
 
   /**
    * Get the next row of values from this Store.
    * @param outResult
-   * @param limit
-   * @param remainingResultSize
+   * @param scannerContext
    * @return true if there are more rows, false if scanner is done
    */
   @Override
-  public NextState next(List<Cell> outResult, int limit, long remainingResultSize)
-      throws IOException {
+  public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
     lock.lock();
     try {
+    if (scannerContext == null) {
+      throw new IllegalArgumentException("Scanner context cannot be null");
+    }
     if (checkReseek()) {
-      return NextState.makeState(NextState.State.MORE_VALUES, 0);
+      return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
     }
 
     // if the heap was left null, then the scanners had previously run out anyways, close and
     // return.
     if (this.heap == null) {
       close();
-      return NextState.makeState(NextState.State.NO_MORE_VALUES, 0);
+      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
     }
 
     Cell peeked = this.heap.peek();
     if (peeked == null) {
       close();
-      return NextState.makeState(NextState.State.NO_MORE_VALUES, 0);
+      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
     }
 
     // only call setRow if the row changes; avoids confusing the query matcher
@@ -489,16 +485,18 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     int offset = peeked.getRowOffset();
     short length = peeked.getRowLength();
 
-    // If limit < 0 and remainingResultSize < 0 we can skip the row comparison because we know
-    // the row has changed. Else it is possible we are still traversing the same row so we
-    // must perform the row comparison.
-      if ((limit < 0 && remainingResultSize < 0) || matcher.row == null
-          || !Bytes.equals(row, offset, length, matcher.row, matcher.rowOffset,
-            matcher.rowLength)) {
-        this.countPerRow = 0;
-        matcher.setRow(row, offset, length);
+    // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
+    // rows. Else it is possible we are still traversing the same row so we must perform the row
+    // comparison.
+    if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.row == null ||
+        !Bytes.equals(row, offset, length, matcher.row, matcher.rowOffset, matcher.rowLength)) {
+      this.countPerRow = 0;
+      matcher.setRow(row, offset, length);
     }
 
+    // Clear progress away unless invoker has indicated it should be kept.
+    if (!scannerContext.getKeepProgress()) scannerContext.clearProgress();
+    
     Cell cell;
 
     // Only do a sanity-check if store and comparator are available.
@@ -507,7 +505,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
 
     int count = 0;
     long totalBytesRead = 0;
-      long totalHeapSize = 0;
 
     LOOP: while((cell = this.heap.peek()) != null) {
       if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
@@ -532,7 +529,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
               this.countPerRow > (storeLimit + storeOffset)) {
             // do what SEEK_NEXT_ROW does.
             if (!matcher.moreRowsMayExistAfter(cell)) {
-              return NextState.makeState(NextState.State.NO_MORE_VALUES, totalHeapSize);
+              return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
             }
             seekToNextRow(cell);
             break LOOP;
@@ -542,9 +539,15 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
           // also update metric accordingly
           if (this.countPerRow > storeOffset) {
             outResult.add(cell);
+
+            // Update local tracking information
             count++;
             totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell);
-            totalHeapSize += CellUtil.estimatedHeapSizeOfWithoutTags(cell);
+
+            // Update the progress of the scanner context
+            scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell));
+            scannerContext.incrementBatchProgress(1);
+
             if (totalBytesRead > maxRowSize) {
               throw new RowTooBigException("Max row size allowed: " + maxRowSize
               + ", but the row is bigger than that.");
@@ -553,7 +556,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
 
           if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
             if (!matcher.moreRowsMayExistAfter(cell)) {
-              return NextState.makeState(NextState.State.NO_MORE_VALUES, totalHeapSize);
+              return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
             }
             seekToNextRow(cell);
           } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
@@ -562,26 +565,26 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
             this.heap.next();
           }
 
-          if (limit > 0 && (count == limit)) {
+          if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
             break LOOP;
           }
-          if (remainingResultSize > 0 && (totalHeapSize >= remainingResultSize)) {
+          if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
             break LOOP;
           }
           continue;
 
         case DONE:
-          return NextState.makeState(NextState.State.MORE_VALUES, totalHeapSize);
+          return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
 
         case DONE_SCAN:
           close();
-          return NextState.makeState(NextState.State.NO_MORE_VALUES, totalHeapSize);
+          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
 
         case SEEK_NEXT_ROW:
           // This is just a relatively simple end of scan fix, to short-cut end
           // us if there is an endKey in the scan.
           if (!matcher.moreRowsMayExistAfter(cell)) {
-            return NextState.makeState(NextState.State.NO_MORE_VALUES, totalHeapSize);
+            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
           }
 
           seekToNextRow(cell);
@@ -611,12 +614,12 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     }
 
     if (count > 0) {
-      return NextState.makeState(NextState.State.MORE_VALUES, totalHeapSize);
+      return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
     }
 
     // No more keys
     close();
-    return NextState.makeState(NextState.State.NO_MORE_VALUES, totalHeapSize);
+    return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
     } finally {
       lock.unlock();
     }
@@ -655,11 +658,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     return qcode;
   }
 
-  @Override
-  public NextState next(List<Cell> outResult) throws IOException {
-    return next(outResult, -1);
-  }
-
   // Implementation of ChangedReadersObserver
   @Override
   public void updateReaders() throws IOException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ba621e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index ae820b5..d1bb657 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -38,8 +38,8 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
@@ -246,10 +246,13 @@ public abstract class Compactor {
         store.getRegionInfo().getRegionNameAsString() + "#" + store.getFamily().getNameAsString();
     long now = 0;
     boolean hasMore;
+    ScannerContext scannerContext =
+        ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
+
     throughputController.start(compactionName);
     try {
       do {
-        hasMore = NextState.hasMoreValues(scanner.next(cells, compactionKVMax));
+        hasMore = scanner.next(cells, scannerContext);
         if (LOG.isDebugEnabled()) {
           now = EnvironmentEdgeManager.currentTime();
         }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ba621e/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
index a01f8a1..fafc5a5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
@@ -62,7 +62,6 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -369,7 +368,7 @@ public class AccessControlLists {
       while (true) {
         List<Cell> row = new ArrayList<Cell>();
 
-        boolean hasNext = NextState.hasMoreValues(iScanner.next(row));
+        boolean hasNext = iScanner.next(row);
         ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
         byte[] entry = null;
         for (Cell kv : row) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ba621e/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index 2bab7e8..cd8f5ff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -87,11 +87,11 @@ import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
@@ -807,10 +807,12 @@ public class AccessController extends BaseMasterAndRegionObserver
     boolean foundColumn = false;
     try {
       boolean more = false;
+      ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(1).build();
+
       do {
         cells.clear();
         // scan with limit as 1 to hold down memory use on wide rows
-        more = NextState.hasMoreValues(scanner.next(cells, 1));
+        more = scanner.next(cells, scannerContext);
         for (Cell cell: cells) {
           if (LOG.isTraceEnabled()) {
             LOG.trace("Found cell " + cell);

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ba621e/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java
index 7d1ff0d..0d5b27e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java
@@ -42,7 +42,6 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
@@ -568,7 +567,7 @@ public abstract class HBaseTestCase extends TestCase {
     @Override
     public boolean next(List<Cell> results)
     throws IOException {
-      return NextState.hasMoreValues(scanner.next(results));
+      return scanner.next(results);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ba621e/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
index e7c3813..eef955e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
@@ -409,6 +409,7 @@ public class TestPartialResultsFromClientSide {
     scan.setBatch(batch);
     ResultScanner scanner = TABLE.getScanner(scan);
     Result result = scanner.next();
+    int repCount = 0;
 
     while ((result = scanner.next()) != null) {
       assertTrue(result.rawCells() != null);
@@ -416,11 +417,12 @@ public class TestPartialResultsFromClientSide {
       if (result.isPartial()) {
         final String error =
             "Cells:" + result.rawCells().length + " Batch size:" + batch
-                + " cellsPerPartialResult:" + cellsPerPartialResult;
+                + " cellsPerPartialResult:" + cellsPerPartialResult + " rep:" + repCount;
         assertTrue(error, result.rawCells().length <= Math.min(batch, cellsPerPartialResult));
       } else {
         assertTrue(result.rawCells().length <= batch);
       }
+      repCount++;
     }
 
     scanner.close();
@@ -458,7 +460,7 @@ public class TestPartialResultsFromClientSide {
       do {
         partialResult = partialScanner.next();
         partials.add(partialResult);
-      } while (partialResult.isPartial());
+      } while (partialResult != null && partialResult.isPartial());
 
       completeResult = Result.createCompleteResult(partials);
       oneShotResult = oneShotScanner.next();
@@ -696,7 +698,7 @@ public class TestPartialResultsFromClientSide {
       LOG.info("r2: " + r2);
     }
 
-    final String failureMessage = "Results r1:" + r1 + " r2:" + r2 + " are not equivalent";
+    final String failureMessage = "Results r1:" + r1 + " \nr2:" + r2 + " are not equivalent";
     if (r1 == null && r2 == null) fail(failureMessage);
     else if (r1 == null || r2 == null) fail(failureMessage);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ba621e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIntraRowPagination.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIntraRowPagination.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIntraRowPagination.java
index cdfb774..1f6dc98 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIntraRowPagination.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIntraRowPagination.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.HTestConst;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -94,7 +93,7 @@ public class TestIntraRowPagination {
       RegionScanner scanner = region.getScanner(scan);
       List<Cell> kvListScan = new ArrayList<Cell>();
       List<Cell> results = new ArrayList<Cell>();
-      while (NextState.hasMoreValues(scanner.next(results)) || !results.isEmpty()) {
+      while (scanner.next(results) || !results.isEmpty()) {
         kvListScan.addAll(results);
         results.clear();
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ba621e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
index efc8db2..bfc1230 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableNotFoundException;
@@ -659,25 +660,34 @@ public class TestReplicasClient {
   private void runMultipleScansOfOneType(boolean reversed, boolean small) throws Exception {
     openRegion(hriSecondary);
     int NUMROWS = 100;
+    int NUMCOLS = 10;
     try {
       for (int i = 0; i < NUMROWS; i++) {
         byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
-        Put p = new Put(b1);
-        p.add(f, b1, b1);
-        table.put(p);
+        for (int col = 0; col < NUMCOLS; col++) {
+          Put p = new Put(b1);
+          String qualifier = "qualifer" + col;
+          KeyValue kv = new KeyValue(b1, f, qualifier.getBytes());
+          p.add(kv);
+          table.put(p);
+        }
       }
       LOG.debug("PUT done");
       int caching = 20;
+      long maxResultSize = Long.MAX_VALUE;
+
       byte[] start;
       if (reversed) start = Bytes.toBytes("testUseRegionWithReplica" + (NUMROWS - 1));
       else start = Bytes.toBytes("testUseRegionWithReplica" + 0);
 
-      scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, start, NUMROWS, false, false);
+      scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize,
+        start, NUMROWS, NUMCOLS, false, false);
 
-      //Even if we were to slow the server down, unless we ask for stale
-      //we won't get it
+      // Even if we were to slow the server down, unless we ask for stale
+      // we won't get it
       SlowMeCopro.sleepTime.set(5000);
-      scanWithReplicas(reversed, small, Consistency.STRONG, caching, start, NUMROWS, false, false);
+      scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize, start, NUMROWS,
+        NUMCOLS, false, false);
       SlowMeCopro.sleepTime.set(0);
 
       flushRegion(hriPrimary);
@@ -686,13 +696,32 @@ public class TestReplicasClient {
 
       //Now set the flag to get a response even if stale
       SlowMeCopro.sleepTime.set(5000);
-      scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, start, NUMROWS, true, false);
+      scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize,
+        start, NUMROWS, NUMCOLS, true, false);
       SlowMeCopro.sleepTime.set(0);
 
       // now make some 'next' calls slow
       SlowMeCopro.slowDownNext.set(true);
       SlowMeCopro.countOfNext.set(0);
-      scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, start, NUMROWS, true, true);
+      scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start,
+        NUMROWS, NUMCOLS, true, true);
+      SlowMeCopro.slowDownNext.set(false);
+      SlowMeCopro.countOfNext.set(0);
+
+      // Make sure we do not get stale data..
+      SlowMeCopro.sleepTime.set(5000);
+      scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize,
+        start, NUMROWS, NUMCOLS, false, false);
+      SlowMeCopro.sleepTime.set(0);
+
+      // While the next calls are slow, set maxResultSize to 1 so that some partial results will be
+      // returned from the server before the replica switch occurs.
+      maxResultSize = 1;
+      SlowMeCopro.slowDownNext.set(true);
+      SlowMeCopro.countOfNext.set(0);
+      scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start,
+        NUMROWS, NUMCOLS, true, true);
+      maxResultSize = Long.MAX_VALUE;
       SlowMeCopro.slowDownNext.set(false);
       SlowMeCopro.countOfNext.set(0);
     } finally {
@@ -710,33 +739,60 @@ public class TestReplicasClient {
   }
 
   private void scanWithReplicas(boolean reversed, boolean small, Consistency consistency,
-      int caching, byte[] startRow, int numRows, boolean staleExpected, boolean slowNext)
+      int caching, long maxResultSize, byte[] startRow, int numRows, int numCols,
+      boolean staleExpected, boolean slowNext)
           throws Exception {
     Scan scan = new Scan(startRow);
     scan.setCaching(caching);
+    scan.setMaxResultSize(maxResultSize);
     scan.setReversed(reversed);
     scan.setSmall(small);
     scan.setConsistency(consistency);
     ResultScanner scanner = table.getScanner(scan);
     Iterator<Result> iter = scanner.iterator();
+
+    // Maps of row keys that we have seen so far
     HashMap<String, Boolean> map = new HashMap<String, Boolean>();
-    int count = 0;
+
+    // Tracked metrics
+    int rowCount = 0;
+    int cellCount = 0;
     int countOfStale = 0;
+
     while (iter.hasNext()) {
-      count++;
+      rowCount++;
       Result r = iter.next();
-      if (map.containsKey(new String(r.getRow()))) {
+      String row = new String(r.getRow());
+
+      if (map.containsKey(row)) {
         throw new Exception("Unexpected scan result. Repeated row " + Bytes.toString(r.getRow()));
       }
-      map.put(new String(r.getRow()), true);
+
+      map.put(row, true);
+
+      for (Cell cell : r.rawCells()) {
+        cellCount++;
+      }
+
       if (!slowNext) Assert.assertTrue(r.isStale() == staleExpected);
       if (r.isStale()) countOfStale++;
     }
-    LOG.debug("Count of rows " + count + " num rows expected " + numRows);
-    Assert.assertTrue(count == numRows);
+    Assert.assertTrue("Count of rows " + rowCount + " num rows expected " + numRows,
+      rowCount == numRows);
+    Assert.assertTrue("Count of cells: " + cellCount + " cells expected: " + numRows * numCols,
+      cellCount == (numRows * numCols));
+
     if (slowNext) {
       LOG.debug("Count of Stale " + countOfStale);
-      Assert.assertTrue(countOfStale > 1 && countOfStale < numRows);
+      Assert.assertTrue(countOfStale > 1);
+
+      // If the scan was configured in such a way that a full row was NOT retrieved before the
+      // replica switch occurred, then it is possible that all rows were stale
+      if (maxResultSize != Long.MAX_VALUE) {
+        Assert.assertTrue(countOfStale <= numRows);
+      } else {
+        Assert.assertTrue(countOfStale < numRows);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ba621e/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java
index 68053c0..8aa8da1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationP
 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumResponse;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.util.Bytes;
 
 import com.google.protobuf.RpcCallback;
@@ -89,7 +88,7 @@ implements Coprocessor, CoprocessorService {
       boolean hasMore = false;
       do {
         curVals.clear();
-        hasMore = NextState.hasMoreValues(scanner.next(curVals));
+        hasMore = scanner.next(curVals);
         for (Cell kv : curVals) {
           if (CellUtil.matchingQualifier(kv, qualifier)) {
             sumResult += Bytes.toInt(kv.getValueArray(), kv.getValueOffset());

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ba621e/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointNullResponse.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointNullResponse.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointNullResponse.java
index c9a628a..4315946 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointNullResponse.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointNullResponse.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationW
 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.SumResponse;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -98,7 +97,7 @@ implements Coprocessor, CoprocessorService  {
       boolean hasMore = false;
       do {
         curVals.clear();
-        hasMore = NextState.hasMoreValues(scanner.next(curVals));
+        hasMore = scanner.next(curVals);
         for (Cell kv : curVals) {
           if (CellUtil.matchingQualifier(kv, qualifier)) {
             sumResult += Bytes.toInt(kv.getValueArray(), kv.getValueOffset());

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ba621e/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointWithErrors.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointWithErrors.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointWithErrors.java
index 0c4d076..54289ef 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointWithErrors.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpointWithErrors.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationW
 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos.SumResponse;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -98,7 +97,7 @@ implements Coprocessor, CoprocessorService  {
       boolean hasMore = false;
       do {
         curVals.clear();
-        hasMore = NextState.hasMoreValues(scanner.next(curVals));
+        hasMore = scanner.next(curVals);
         for (Cell kv : curVals) {
           if (CellUtil.matchingQualifier(kv, qualifier)) {
             sumResult += Bytes.toInt(kv.getValueArray(), kv.getValueOffset());

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ba621e/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
index 10ecae3..a8b5456 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.SplitTransaction;
 import org.apache.hadoop.hbase.regionserver.SplitTransactionFactory;
 import org.apache.hadoop.hbase.regionserver.Store;
@@ -87,36 +88,26 @@ public class TestCoprocessorInterface {
     }
 
     @Override
-    public NextState next(List<Cell> results) throws IOException {
+    public boolean next(List<Cell> results) throws IOException {
       return delegate.next(results);
     }
 
     @Override
-    public NextState next(List<Cell> result, int limit) throws IOException {
-      return delegate.next(result, limit);
-    }
-
-    @Override
-    public NextState next(List<Cell> result, int limit, long remainingResultSize)
+    public boolean next(List<Cell> result, ScannerContext scannerContext)
         throws IOException {
-      return delegate.next(result, limit, remainingResultSize);
+      return delegate.next(result, scannerContext);
     }
 
     @Override
-    public NextState nextRaw(List<Cell> result)
+    public boolean nextRaw(List<Cell> result)
         throws IOException {
       return delegate.nextRaw(result);
     }
 
     @Override
-    public NextState nextRaw(List<Cell> result, int limit) throws IOException {
-      return delegate.nextRaw(result, limit);
-    }
-
-    @Override
-    public NextState nextRaw(List<Cell> result, int limit, long remainingResultSize)
+    public boolean nextRaw(List<Cell> result, ScannerContext context)
         throws IOException {
-      return delegate.nextRaw(result, limit, remainingResultSize);
+      return delegate.nextRaw(result, context);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ba621e/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
index a4963ae..438cf2d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
@@ -66,8 +66,10 @@ import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.NoLimitScannerContext;
 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
@@ -433,24 +435,17 @@ public class TestRegionObserverInterface {
         Store store, final InternalScanner scanner, final ScanType scanType) {
       return new InternalScanner() {
         @Override
-        public NextState next(List<Cell> results) throws IOException {
-          return next(results, -1);
+        public boolean next(List<Cell> results) throws IOException {
+          return next(results, NoLimitScannerContext.NO_LIMIT);
         }
 
         @Override
-        public NextState next(List<Cell> results, int limit) throws IOException {
-          return next(results, limit, -1);
-        }
-
-        @Override
-        public NextState next(List<Cell> results, int limit, long remainingResultSize)
+        public boolean next(List<Cell> results, ScannerContext scannerContext)
             throws IOException {
           List<Cell> internalResults = new ArrayList<Cell>();
           boolean hasMore;
-          NextState state;
           do {
-            state = scanner.next(internalResults, limit, remainingResultSize);
-            hasMore = state != null && state.hasMoreValues();
+            hasMore = scanner.next(internalResults, scannerContext);
             if (!internalResults.isEmpty()) {
               long row = Bytes.toLong(CellUtil.cloneValue(internalResults.get(0)));
               if (row % 2 == 0) {
@@ -465,7 +460,7 @@ public class TestRegionObserverInterface {
           if (!internalResults.isEmpty()) {
             results.addAll(internalResults);
           }
-          return state;
+          return hasMore;
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ba621e/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestColumnPrefixFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestColumnPrefixFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestColumnPrefixFilter.java
index abd9921..828842d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestColumnPrefixFilter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestColumnPrefixFilter.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.testclassification.FilterTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -105,7 +104,8 @@ public class TestColumnPrefixFilter {
 
         InternalScanner scanner = region.getScanner(scan);
         List<Cell> results = new ArrayList<Cell>();
-        while (NextState.hasMoreValues(scanner.next(results)));
+        while (scanner.next(results))
+          ;
         assertEquals(prefixMap.get(s).size(), results.size());
       }
     } finally {
@@ -170,7 +170,8 @@ public class TestColumnPrefixFilter {
 
         InternalScanner scanner = region.getScanner(scan);
         List<Cell> results = new ArrayList<Cell>();
-        while (NextState.hasMoreValues(scanner.next(results)));
+        while (scanner.next(results))
+          ;
         assertEquals(prefixMap.get(s).size(), results.size());
       }
     } finally {

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ba621e/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestDependentColumnFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestDependentColumnFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestDependentColumnFilter.java
index 97f0874..add549a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestDependentColumnFilter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestDependentColumnFilter.java
@@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.testclassification.FilterTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -151,7 +150,7 @@ public class TestDependentColumnFilter {
     int i = 0;
     int cells = 0;
     for (boolean done = true; done; i++) {
-      done = NextState.hasMoreValues(scanner.next(results));
+      done = scanner.next(results);
       Arrays.sort(results.toArray(new KeyValue[results.size()]),
           KeyValue.COMPARATOR);
       LOG.info("counter=" + i + ", " + results);

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ba621e/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java
index 82ea5d4..5fcf64e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java
@@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.FilterList.Operator;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.testclassification.FilterTests;
@@ -503,7 +502,7 @@ public class TestFilter {
     InternalScanner scanner = this.region.getScanner(s);
     int scannerCounter = 0;
     while (true) {
-      boolean isMoreResults = NextState.hasMoreValues(scanner.next(new ArrayList<Cell>()));
+      boolean isMoreResults = scanner.next(new ArrayList<Cell>());
       scannerCounter++;
 
       if (scannerCounter >= pageSize) {
@@ -532,7 +531,7 @@ public class TestFilter {
     InternalScanner scanner = this.region.getScanner(s);
     while (true) {
       ArrayList<Cell> values = new ArrayList<Cell>();
-      boolean isMoreResults = NextState.hasMoreValues(scanner.next(values));
+      boolean isMoreResults = scanner.next(values);
       if (!isMoreResults
           || !Bytes.toString(values.get(0).getRow()).startsWith(prefix)) {
         Assert.assertTrue(
@@ -566,7 +565,7 @@ public class TestFilter {
     InternalScanner scanner = this.region.getScanner(s);
     int scannerCounter = 0;
     while (true) {
-      boolean isMoreResults = NextState.hasMoreValues(scanner.next(new ArrayList<Cell>()));
+      boolean isMoreResults = scanner.next(new ArrayList<Cell>());
       scannerCounter++;
 
       if (scannerCounter >= pageSize) {
@@ -644,7 +643,7 @@ public class TestFilter {
     InternalScanner scanner = this.region.getScanner(s);
     while (true) {
       ArrayList<Cell> values = new ArrayList<Cell>();
-      boolean isMoreResults = NextState.hasMoreValues(scanner.next(values));
+      boolean isMoreResults = scanner.next(values);
       if (!isMoreResults || !Bytes.toString(CellUtil.cloneRow(values.get(0))).startsWith(prefix)) {
         assertTrue("The WhileMatchFilter should now filter all remaining", filter.filterAllRemaining());
       }
@@ -673,7 +672,7 @@ public class TestFilter {
     InternalScanner scanner = this.region.getScanner(s);
     while (true) {
       ArrayList<Cell> values = new ArrayList<Cell>();
-      boolean isMoreResults = NextState.hasMoreValues(scanner.next(values));
+      boolean isMoreResults = scanner.next(values);
       assertTrue("The WhileMatchFilter should now filter all remaining", filter.filterAllRemaining());
       if (!isMoreResults) {
         break;
@@ -1476,7 +1475,7 @@ public class TestFilter {
     InternalScanner scanner = testRegion.getScanner(s1);
     List<Cell> results = new ArrayList<Cell>();
     int resultCount = 0;
-    while (NextState.hasMoreValues(scanner.next(results))) {
+    while (scanner.next(results)) {
       resultCount++;
       byte[] row =  CellUtil.cloneRow(results.get(0));
       LOG.debug("Found row: " + Bytes.toStringBinary(row));
@@ -1618,7 +1617,7 @@ public class TestFilter {
     List<Cell> results = new ArrayList<Cell>();
     int i = 0;
     for (boolean done = true; done; i++) {
-      done = NextState.hasMoreValues(scanner.next(results));
+      done = scanner.next(results);
       Arrays.sort(results.toArray(new KeyValue[results.size()]),
           KeyValue.COMPARATOR);
       LOG.info("counter=" + i + ", " + results);
@@ -1640,7 +1639,7 @@ public class TestFilter {
     List<Cell> results = new ArrayList<Cell>();
     int i = 0;
     for (boolean done = true; done; i++) {
-      done = NextState.hasMoreValues(scanner.next(results));
+      done = scanner.next(results);
       Arrays.sort(results.toArray(new KeyValue[results.size()]),
           KeyValue.COMPARATOR);
       LOG.info("counter=" + i + ", " + results);
@@ -1662,7 +1661,7 @@ public class TestFilter {
     int row = 0;
     int idx = 0;
     for (boolean done = true; done; row++) {
-      done = NextState.hasMoreValues(scanner.next(results));
+      done = scanner.next(results);
       Arrays.sort(results.toArray(new KeyValue[results.size()]),
           KeyValue.COMPARATOR);
       if(results.isEmpty()) break;
@@ -1693,7 +1692,7 @@ public class TestFilter {
     int row = 0;
     int idx = 0;
     for (boolean more = true; more; row++) {
-      more = NextState.hasMoreValues(scanner.next(results));
+      more = scanner.next(results);
       Arrays.sort(results.toArray(new KeyValue[results.size()]),
           KeyValue.COMPARATOR);
       if(results.isEmpty()) break;
@@ -2029,7 +2028,7 @@ public class TestFilter {
     List<Cell> results = new ArrayList<Cell>();
     int i = 5;
     for (boolean done = true; done; i++) {
-      done = NextState.hasMoreValues(scanner.next(results));
+      done = scanner.next(results);
       assertTrue(CellUtil.matchingRow(results.get(0), Bytes.toBytes("row" + i)));
       assertEquals(Bytes.toInt(CellUtil.cloneValue(results.get(0))), i%2);
       results.clear();
@@ -2052,7 +2051,7 @@ public class TestFilter {
       assertEquals(Bytes.toInt(CellUtil.cloneValue(results.get(0))), i%2);
       results.clear();
     }
-    assertFalse(NextState.hasMoreValues(scanner.next(results)));
+    assertFalse(scanner.next(results));
     // 3. let's begin to verify nested filter list
     // 3.1 add rowFilter, then add subFilterList
     FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
@@ -2074,7 +2073,7 @@ public class TestFilter {
       assertEquals(Bytes.toInt(CellUtil.cloneValue(results.get(0))), i%2);
       results.clear();
     }
-    assertFalse(NextState.hasMoreValues(scanner.next(results)));
+    assertFalse(scanner.next(results));
     // 3.2 MAGIC here! add subFilterList first, then add rowFilter
     filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
     filterList.addFilter(subFilterList);
@@ -2095,7 +2094,7 @@ public class TestFilter {
       assertEquals(Bytes.toInt(CellUtil.cloneValue(results.get(0))), i%2);
       results.clear();
     }
-    assertFalse(NextState.hasMoreValues(scanner.next(results)));
+    assertFalse(scanner.next(results));
     WAL wal = ((HRegion)testRegion).getWAL();
     ((HRegion)testRegion).close();
     wal.close();

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ba621e/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestInvocationRecordFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestInvocationRecordFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestInvocationRecordFilter.java
index b88bbbf..a8651d8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestInvocationRecordFilter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestInvocationRecordFilter.java
@@ -34,12 +34,11 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.testclassification.FilterTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -140,7 +139,7 @@ public class TestInvocationRecordFilter {
     List<Cell> actualValues = new ArrayList<Cell>();
     List<Cell> temp = new ArrayList<Cell>();
     InternalScanner scanner = this.region.getScanner(scan);
-    while (NextState.hasMoreValues(scanner.next(temp))) {
+    while (scanner.next(temp)) {
       actualValues.addAll(temp);
       temp.clear();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ba621e/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestMultipleColumnPrefixFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestMultipleColumnPrefixFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestMultipleColumnPrefixFilter.java
index 25f2e88..7b700b7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestMultipleColumnPrefixFilter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestMultipleColumnPrefixFilter.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.testclassification.FilterTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -110,7 +109,7 @@ public class TestMultipleColumnPrefixFilter {
     scan.setFilter(filter);
     List<Cell> results = new ArrayList<Cell>();  
     InternalScanner scanner = region.getScanner(scan);
-    while (NextState.hasMoreValues(scanner.next(results)))
+    while (scanner.next(results))
       ;
     assertEquals(prefixMap.get("p").size() + prefixMap.get("q").size(), results.size());
 
@@ -183,7 +182,7 @@ public class TestMultipleColumnPrefixFilter {
     scan.setFilter(filter);
     List<Cell> results = new ArrayList<Cell>();  
     InternalScanner scanner = region.getScanner(scan);
-    while (NextState.hasMoreValues(scanner.next(results)))
+    while (scanner.next(results))
       ;
     assertEquals(prefixMap.get("p").size() + prefixMap.get("q").size(), results.size());
 
@@ -228,7 +227,7 @@ public class TestMultipleColumnPrefixFilter {
     scan1.setFilter(multiplePrefixFilter);
     List<Cell> results1 = new ArrayList<Cell>();  
     InternalScanner scanner1 = region.getScanner(scan1);
-    while (NextState.hasMoreValues(scanner1.next(results1)))
+    while (scanner1.next(results1))
       ;
     
     ColumnPrefixFilter singlePrefixFilter;
@@ -239,7 +238,7 @@ public class TestMultipleColumnPrefixFilter {
     scan2.setFilter(singlePrefixFilter);
     List<Cell> results2 = new ArrayList<Cell>();  
     InternalScanner scanner2 = region.getScanner(scan1);
-    while (NextState.hasMoreValues(scanner2.next(results2)))
+    while (scanner2.next(results2))
       ;
     
     assertEquals(results1.size(), results2.size());

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ba621e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestPrefixTree.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestPrefixTree.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestPrefixTree.java
index 1eda567..e31a73b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestPrefixTree.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestPrefixTree.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.testclassification.IOTests;
@@ -117,7 +116,7 @@ public class TestPrefixTree {
     RegionScanner scanner = region.getScanner(scan);
     List<Cell> cells = new ArrayList<Cell>();
     for (int i = 0; i < 3; i++) {
-      assertEquals(i < 2, NextState.hasMoreValues(scanner.next(cells)));
+      assertEquals(i < 2, scanner.next(cells));
       CellScanner cellScanner = Result.create(cells).cellScanner();
       while (cellScanner.advance()) {
         assertEquals(rows[i], Bytes.toString(cellScanner.current().getRowArray(), cellScanner
@@ -136,7 +135,7 @@ public class TestPrefixTree {
     scan.setStopRow(Bytes.toBytes("a-b-A-1:"));
     scanner = region.getScanner(scan);
     for (int i = 1; i < 3; i++) {
-      assertEquals(i < 2, NextState.hasMoreValues(scanner.next(cells)));
+      assertEquals(i < 2, scanner.next(cells));
       CellScanner cellScanner = Result.create(cells).cellScanner();
       while (cellScanner.advance()) {
         assertEquals(rows[i], Bytes.toString(cellScanner.current().getRowArray(), cellScanner
@@ -152,7 +151,7 @@ public class TestPrefixTree {
     scan.setStopRow(Bytes.toBytes("a-b-A-1:"));
     scanner = region.getScanner(scan);
     for (int i = 1; i < 3; i++) {
-      assertEquals(i < 2, NextState.hasMoreValues(scanner.next(cells)));
+      assertEquals(i < 2, scanner.next(cells));
       CellScanner cellScanner = Result.create(cells).cellScanner();
       while (cellScanner.advance()) {
         assertEquals(rows[i], Bytes.toString(cellScanner.current().getRowArray(), cellScanner
@@ -167,7 +166,7 @@ public class TestPrefixTree {
     scan.setStartRow(Bytes.toBytes("a-b-A-1-140239"));
     scan.setStopRow(Bytes.toBytes("a-b-A-1:"));
     scanner = region.getScanner(scan);
-    assertFalse(NextState.hasMoreValues(scanner.next(cells)));
+    assertFalse(scanner.next(cells));
     assertFalse(cells.isEmpty());
     scanner.close();
   }
@@ -186,7 +185,7 @@ public class TestPrefixTree {
     Scan scan = new Scan(Bytes.toBytes("obj29995"));
     RegionScanner scanner = region.getScanner(scan);
     List<Cell> cells = new ArrayList<Cell>();
-    assertFalse(NextState.hasMoreValues(scanner.next(cells)));
+    assertFalse(scanner.next(cells));
     assertArrayEquals(Bytes.toBytes("obj3"), Result.create(cells).getRow());
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ba621e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerSelectionUsingKeyRange.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerSelectionUsingKeyRange.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerSelectionUsingKeyRange.java
index 6baadbb..7584cf2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerSelectionUsingKeyRange.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerSelectionUsingKeyRange.java
@@ -37,10 +37,9 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
+import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.AfterClass;
 import org.junit.Test;
@@ -122,7 +121,7 @@ public class TestScannerSelectionUsingKeyRange {
     cache.clearCache();
     InternalScanner scanner = region.getScanner(scan);
     List<Cell> results = new ArrayList<Cell>();
-    while (NextState.hasMoreValues(scanner.next(results))) {
+    while (scanner.next(results)) {
     }
     scanner.close();
     assertEquals(0, results.size());

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ba621e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerSelectionUsingTTL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerSelectionUsingTTL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerSelectionUsingTTL.java
index 4e0743d..d5f4bcd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerSelectionUsingTTL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerSelectionUsingTTL.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -140,7 +139,7 @@ public class TestScannerSelectionUsingTTL {
     final int expectedKVsPerRow = numFreshFiles * NUM_COLS_PER_ROW;
     int numReturnedRows = 0;
     LOG.info("Scanning the entire table");
-    while (NextState.hasMoreValues(scanner.next(results)) || results.size() > 0) {
+    while (scanner.next(results) || results.size() > 0) {
       assertEquals(expectedKVsPerRow, results.size());
       ++numReturnedRows;
       results.clear();

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ba621e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
index 478e239..66e1952 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
@@ -60,11 +60,10 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -467,7 +466,8 @@ public class TestAtomicOperation {
               Scan s = new Scan(row);
               RegionScanner rs = region.getScanner(s);
               List<Cell> r = new ArrayList<Cell>();
-              while(NextState.hasMoreValues(rs.next(r)));
+              while (rs.next(r))
+                ;
               rs.close();
               if (r.size() != 1) {
                 LOG.debug(r);
@@ -561,7 +561,8 @@ public class TestAtomicOperation {
     Scan s = new Scan();
     RegionScanner scanner = region.getScanner(s);
     List<Cell> results = new ArrayList<Cell>();
-    scanner.next(results, 2);
+    ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(2).build();
+    scanner.next(results, scannerContext);
     for (Cell keyValue : results) {
       assertEquals("50",Bytes.toString(CellUtil.cloneValue(keyValue)));
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ba621e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksScanned.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksScanned.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksScanned.java
index 446c64c..b2ba97c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksScanned.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksScanned.java
@@ -25,15 +25,14 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
-import org.apache.hadoop.hbase.testclassification.RegionServerTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.CacheStats;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Assert;
 import org.junit.Before;
@@ -103,7 +102,8 @@ public class TestBlocksScanned extends HBaseTestCase {
 
     InternalScanner s = r.getScanner(scan);
     List<Cell> results = new ArrayList<Cell>();
-    while (NextState.hasMoreValues(s.next(results)));
+    while (s.next(results))
+      ;
     s.close();
 
     int expectResultSize = 'z' - 'a';

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ba621e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestColumnSeeking.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestColumnSeeking.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestColumnSeeking.java
index c09b32d..1d5c61b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestColumnSeeking.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestColumnSeeking.java
@@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -161,7 +160,7 @@ public class TestColumnSeeking {
         }
         InternalScanner scanner = region.getScanner(scan);
         List<Cell> results = new ArrayList<Cell>();
-        while (NextState.hasMoreValues(scanner.next(results)))
+        while (scanner.next(results))
           ;
         assertEquals(kvSet.size(), results.size());
         assertTrue(KeyValueTestUtil.containsIgnoreMvccVersion(results, kvSet));
@@ -273,7 +272,7 @@ public class TestColumnSeeking {
       }
       InternalScanner scanner = region.getScanner(scan);
       List<Cell> results = new ArrayList<Cell>();
-      while (NextState.hasMoreValues(scanner.next(results)))
+      while (scanner.next(results))
         ;
       assertEquals(kvSet.size(), results.size());
       assertTrue(KeyValueTestUtil.containsIgnoreMvccVersion(results, kvSet));

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ba621e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index e1e5b89..622c145 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.TableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -107,7 +106,7 @@ public class TestDefaultMemStore extends TestCase {
     StoreScanner s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
     int count = 0;
     try {
-      while (NextState.hasMoreValues(s.next(result))) {
+      while (s.next(result)) {
         LOG.info(result);
         count++;
         // Row count is same as column count.
@@ -127,7 +126,7 @@ public class TestDefaultMemStore extends TestCase {
     s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
     count = 0;
     try {
-      while (NextState.hasMoreValues(s.next(result))) {
+      while (s.next(result)) {
         LOG.info(result);
         // Assert the stuff is coming out in right order.
         assertTrue(CellUtil.matchingRow(result.get(0), Bytes.toBytes(count)));
@@ -154,7 +153,7 @@ public class TestDefaultMemStore extends TestCase {
     count = 0;
     int snapshotIndex = 5;
     try {
-      while (NextState.hasMoreValues(s.next(result))) {
+      while (s.next(result)) {
         LOG.info(result);
         // Assert the stuff is coming out in right order.
         assertTrue(CellUtil.matchingRow(result.get(0), Bytes.toBytes(count)));
@@ -528,7 +527,7 @@ public class TestDefaultMemStore extends TestCase {
           Bytes.toBytes(startRowId)), scanInfo, scanType, null,
           memstore.getScanners(0));
       List<Cell> results = new ArrayList<Cell>();
-      for (int i = 0; NextState.hasMoreValues(scanner.next(results)); i++) {
+      for (int i = 0; scanner.next(results); i++) {
         int rowId = startRowId + i;
         Cell left = results.get(0);
         byte[] row1 = Bytes.toBytes(rowId);

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ba621e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestGetClosestAtOrBefore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestGetClosestAtOrBefore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestGetClosestAtOrBefore.java
index 416ee28..110cd36 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestGetClosestAtOrBefore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestGetClosestAtOrBefore.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -97,7 +96,7 @@ public class TestGetClosestAtOrBefore extends HBaseTestCase {
     InternalScanner s = mr.getScanner(new Scan());
     try {
       List<Cell> keys = new ArrayList<Cell>();
-        while (NextState.hasMoreValues(s.next(keys))) {
+        while (s.next(keys)) {
         LOG.info(keys);
         keys.clear();
       }
@@ -121,7 +120,7 @@ public class TestGetClosestAtOrBefore extends HBaseTestCase {
     s = mr.getScanner(scan);
     try {
       List<Cell> keys = new ArrayList<Cell>();
-        while (NextState.hasMoreValues(s.next(keys))) {
+        while (s.next(keys)) {
         mr.delete(new Delete(CellUtil.cloneRow(keys.get(0))));
         keys.clear();
       }