You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/01/22 08:57:40 UTC

hbase git commit: HBASE-17489 ClientScanner may send a next request to a RegionScanner which has been exhausted

Repository: hbase
Updated Branches:
  refs/heads/master 9a9e3df85 -> 3abd13dac


HBASE-17489 ClientScanner may send a next request to a RegionScanner which has been exhausted


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3abd13da
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3abd13da
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3abd13da

Branch: refs/heads/master
Commit: 3abd13dacb57927bd44a47632f4bd0c2e2bb87ea
Parents: 9a9e3df
Author: zhangduo <zh...@apache.org>
Authored: Sun Jan 22 10:02:29 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sun Jan 22 16:39:00 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/ClientScanner.java      | 162 ++--
 .../hbase/client/ReversedClientScanner.java     |   8 +-
 .../hadoop/hbase/client/TestClientScanner.java  |  29 +-
 .../hbase/regionserver/RSRpcServices.java       | 963 ++++++++++---------
 4 files changed, 613 insertions(+), 549 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/3abd13da/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 283272a..ea91100 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -54,9 +54,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
- * Implements the scanner interface for the HBase client.
- * If there are multiple regions in a table, this scanner will iterate
- * through them all.
+ * Implements the scanner interface for the HBase client. If there are multiple regions in a table,
+ * this scanner will iterate through them all.
  */
 @InterfaceAudience.Private
 public abstract class ClientScanner extends AbstractClientScanner {
@@ -229,15 +228,13 @@ public abstract class ClientScanner extends AbstractClientScanner {
     return false; // unlikely.
   }
 
-  private boolean possiblyNextScanner(int nbRows, final boolean done) throws IOException {
-    // If we have just switched replica, don't go to the next scanner yet. Rather, try
-    // the scanner operations on the new replica, from the right point in the scan
-    // Note that when we switched to a different replica we left it at a point
-    // where we just did the "openScanner" with the appropriate startrow
-    if (callable != null && callable.switchedToADifferentReplica()) return true;
-    return nextScanner(nbRows, done);
+  protected final void closeScanner() throws IOException {
+    if (this.callable != null) {
+      this.callable.setClose();
+      call(callable, caller, scannerTimeout);
+      this.callable = null;
+    }
   }
-
   /*
    * Gets a scanner for the next region. If this.currentRegion != null, then we will move to the
    * endrow of this.currentRegion. Else we will get scanner at the scan.getStartRow(). We will go no
@@ -248,11 +245,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
    */
   protected boolean nextScanner(int nbRows, final boolean done) throws IOException {
     // Close the previous scanner if it's open
-    if (this.callable != null) {
-      this.callable.setClose();
-      call(callable, caller, scannerTimeout);
-      this.callable = null;
-    }
+    closeScanner();
 
     // Where to start the next scanner
     byte[] localStartKey;
@@ -371,6 +364,37 @@ public abstract class ClientScanner extends AbstractClientScanner {
     return cache != null ? cache.size() : 0;
   }
 
+  private boolean regionExhausted(Result[] values) {
+    // This means the server tells us the whole scan operation is done. Usually decided by filter.
+    if (values == null) {
+      return true;
+    }
+    // Not a heartbeat message and we get nothing, this means the region is exhausted
+    if (values.length == 0 && !callable.isHeartbeatMessage()) {
+      return true;
+    }
+    // Server tells us that it has no more results for this region. Notice that this flag is get
+    // from the ScanResponse.getMoreResultsInRegion, not ScanResponse.getMoreResults. If the latter
+    // one is false then we will get a null values and quit in the first condition of this method.
+    if (callable.hasMoreResultsContext() && !callable.getServerHasMoreResults()) {
+      return true;
+    }
+    return false;
+  }
+
+  private void closeScannerIfExhausted(boolean exhausted) throws IOException {
+    if (exhausted) {
+      if (!partialResults.isEmpty()) {
+        // XXX: continue if there are partial results. But in fact server should not set
+        // hasMoreResults to false if there are partial results.
+        LOG.warn("Server tells us there is no more results for this region but we still have"
+            + " partialResults, this should not happen, retry on the current scanner anyway");
+      } else {
+        closeScanner();
+      }
+    }
+  }
+
   /**
    * Contact the servers to load more {@link Result}s in the cache.
    */
@@ -380,17 +404,18 @@ public abstract class ClientScanner extends AbstractClientScanner {
     Result[] values = null;
     long remainingResultSize = maxScannerResultSize;
     int countdown = this.caching;
+    // This is possible if we just stopped at the boundary of a region in the previous call.
+    if (callable == null) {
+      if (!nextScanner(countdown, false)) {
+        return;
+      }
+    }
     // We need to reset it if it's a new callable that was created with a countdown in nextScanner
     callable.setCaching(this.caching);
     // This flag is set when we want to skip the result returned. We do
     // this when we reset scanner because it split under us.
     boolean retryAfterOutOfOrderException = true;
-    // We don't expect that the server will have more results for us if
-    // it doesn't tell us otherwise. We rely on the size or count of results
-    boolean serverHasMoreResults = false;
-    boolean allResultsSkipped = false;
-    do {
-      allResultsSkipped = false;
+    for (;;) {
       try {
         // Server returns a null values if scanning is to stop. Else,
         // returns an empty array if scanning is to go on and we've just
@@ -436,7 +461,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
           // Reset the startRow to the row we've seen last so that the new scanner starts at
           // the correct row. Otherwise we may see previously returned rows again.
           // (ScannerCallable by now has "relocated" the correct region)
-          if (!this.lastResult.isPartial() && scan.getBatch() < 0 ) {
+          if (!this.lastResult.isPartial() && scan.getBatch() < 0) {
             if (scan.isReversed()) {
               scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
             } else {
@@ -461,7 +486,10 @@ public abstract class ClientScanner extends AbstractClientScanner {
         // Set this to zero so we don't try and do an rpc and close on remote server when
         // the exception we got was UnknownScanner or the Server is going down.
         callable = null;
-        // This continue will take us to while at end of loop where we will set up new scanner.
+        // reopen the scanner
+        if (!nextScanner(countdown, false)) {
+          break;
+        }
         continue;
       }
       long currentTime = System.currentTimeMillis();
@@ -487,61 +515,58 @@ public abstract class ClientScanner extends AbstractClientScanner {
           remainingResultSize -= estimatedHeapSizeOfResult;
           addEstimatedSize(estimatedHeapSizeOfResult);
           this.lastResult = rs;
-          if (this.lastResult.isPartial() || scan.getBatch() > 0 ) {
+          if (this.lastResult.isPartial() || scan.getBatch() > 0) {
             updateLastCellLoadedToCache(this.lastResult);
           } else {
             this.lastCellLoadedToCache = null;
           }
         }
-        if (cache.isEmpty()) {
-          // all result has been seen before, we need scan more.
-          allResultsSkipped = true;
-          continue;
-        }
       }
+      boolean exhausted = regionExhausted(values);
       if (callable.isHeartbeatMessage()) {
-        if (cache.size() > 0) {
+        if (!cache.isEmpty()) {
           // Caller of this method just wants a Result. If we see a heartbeat message, it means
           // processing of the scan is taking a long time server side. Rather than continue to
           // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing
           // unnecesary delays to the caller
           if (LOG.isTraceEnabled()) {
             LOG.trace("Heartbeat message received and cache contains Results."
-                    + " Breaking out of scan loop");
+                + " Breaking out of scan loop");
           }
+          // we know that the region has not been exhausted yet so just break without calling
+          // closeScannerIfExhausted
           break;
         }
-        continue;
       }
-
-      // We expect that the server won't have more results for us when we exhaust
-      // the size (bytes or count) of the results returned. If the server *does* inform us that
-      // there are more results, we want to avoid possiblyNextScanner(...). Only when we actually
-      // get results is the moreResults context valid.
-      if (null != values && values.length > 0 && callable.hasMoreResultsContext()) {
-        // Only adhere to more server results when we don't have any partialResults
-        // as it keeps the outer loop logic the same.
-        serverHasMoreResults = callable.getServerHasMoreResults() && partialResults.isEmpty();
+      if (countdown <= 0) {
+        // we have enough result.
+        closeScannerIfExhausted(exhausted);
+        break;
       }
-      // Values == null means server-side filter has determined we must STOP
-      // !partialResults.isEmpty() means that we are still accumulating partial Results for a
-      // row. We should not change scanners before we receive all the partial Results for that
-      // row.
-    } while (allResultsSkipped || (callable != null && callable.isHeartbeatMessage())
-        || (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults)
-        && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null))));
-  }
-
-  /**
-   * @param remainingResultSize
-   * @param remainingRows
-   * @param regionHasMoreResults
-   * @return true when the current region has been exhausted. When the current region has been
-   *         exhausted, the region must be changed before scanning can continue
-   */
-  private boolean doneWithRegion(long remainingResultSize, int remainingRows,
-      boolean regionHasMoreResults) {
-    return remainingResultSize > 0 && remainingRows > 0 && !regionHasMoreResults;
+      if (remainingResultSize <= 0) {
+        if (!cache.isEmpty()) {
+          closeScannerIfExhausted(exhausted);
+          break;
+        } else {
+          // we have reached the max result size but we still can not find anything to return to the
+          // user. Reset the maxResultSize and try again.
+          remainingResultSize = maxScannerResultSize;
+        }
+      }
+      // we are done with the current region
+      if (exhausted) {
+        if (!partialResults.isEmpty()) {
+          // XXX: continue if there are partial results. But in fact server should not set
+          // hasMoreResults to false if there are partial results.
+          LOG.warn("Server tells us there is no more results for this region but we still have"
+              + " partialResults, this should not happen, retry on the current scanner anyway");
+          continue;
+        }
+        if (!nextScanner(countdown, values == null)) {
+          break;
+        }
+      }
+    }
   }
 
   protected void addEstimatedSize(long estimatedHeapSizeOfResult) {
@@ -566,9 +591,8 @@ public abstract class ClientScanner extends AbstractClientScanner {
    * @return the list of results that should be added to the cache.
    * @throws IOException
    */
-  protected List<Result>
-      getResultsToAddToCache(Result[] resultsFromServer, boolean heartbeatMessage)
-          throws IOException {
+  protected List<Result> getResultsToAddToCache(Result[] resultsFromServer,
+      boolean heartbeatMessage) throws IOException {
     int resultSize = resultsFromServer != null ? resultsFromServer.length : 0;
     List<Result> resultsToAddToCache = new ArrayList<Result>(resultSize);
 
@@ -583,7 +607,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
     // the batch size even though it may not be the last group of cells for that row.
     if (allowPartials || isBatchSet) {
       addResultsToList(resultsToAddToCache, resultsFromServer, 0,
-          (null == resultsFromServer ? 0 : resultsFromServer.length));
+        (null == resultsFromServer ? 0 : resultsFromServer.length));
       return resultsToAddToCache;
     }
 
@@ -769,12 +793,12 @@ public abstract class ClientScanner extends AbstractClientScanner {
   }
 
   /**
-   * Compare two Cells considering reversed scanner.
-   * ReversedScanner only reverses rows, not columns.
+   * Compare two Cells considering reversed scanner. ReversedScanner only reverses rows, not
+   * columns.
    */
   private int compare(Cell a, Cell b) {
-    CellComparator comparator = currentRegion != null && currentRegion.isMetaRegion() ?
-        CellComparator.META_COMPARATOR : CellComparator.COMPARATOR;
+    CellComparator comparator = currentRegion != null && currentRegion.isMetaRegion()
+        ? CellComparator.META_COMPARATOR : CellComparator.COMPARATOR;
     int r = comparator.compareRows(a, b);
     if (r != 0) {
       return this.scan.isReversed() ? -r : r;

http://git-wip-us.apache.org/repos/asf/hbase/blob/3abd13da/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
index 390e236..e1a522a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
@@ -63,13 +63,7 @@ public class ReversedClientScanner extends ClientSimpleScanner {
   protected boolean nextScanner(int nbRows, final boolean done)
       throws IOException {
     // Close the previous scanner if it's open
-    if (this.callable != null) {
-      this.callable.setClose();
-      // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
-      // we do a callWithRetries
-      this.caller.callWithoutRetries(callable, scannerTimeout);
-      this.callable = null;
-    }
+    closeScanner();
 
     // Where to start the next scanner
     byte[] localStartKey;

http://git-wip-us.apache.org/repos/asf/hbase/blob/3abd13da/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
index fd2a393..4319b9a 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
@@ -158,7 +158,8 @@ public class TestClientScanner {
                 ScannerCallableWithReplicas.class);
           switch (count) {
             case 0: // initialize
-            case 2: // close
+            case 2: // detect no more results
+            case 3: // close
               count++;
               return null;
             case 1:
@@ -184,8 +185,10 @@ public class TestClientScanner {
 
       scanner.loadCache();
 
-      // One more call due to initializeScannerInConstruction()
-      inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(
+      // One for initializeScannerInConstruction()
+      // One for fetching the results
+      // One for fetching null results and quit as we do not have moreResults hint.
+      inOrder.verify(caller, Mockito.times(3)).callWithoutRetries(
           Mockito.any(RetryingCallable.class), Mockito.anyInt());
 
       assertEquals(1, scanner.cache.size());
@@ -224,7 +227,8 @@ public class TestClientScanner {
             case 1:
               count++;
               callable.setHasMoreResultsContext(true);
-              callable.setServerHasMoreResults(false);
+              // if we set false here the implementation will trigger a close
+              callable.setServerHasMoreResults(true);
               return results;
             default:
               throw new RuntimeException("Expected only 2 invocations");
@@ -291,7 +295,8 @@ public class TestClientScanner {
             case 1:
               count++;
               callable.setHasMoreResultsContext(true);
-              callable.setServerHasMoreResults(false);
+              // if we set false here the implementation will trigger a close
+              callable.setServerHasMoreResults(true);
               return results;
             default:
               throw new RuntimeException("Expected only 2 invocations");
@@ -470,13 +475,14 @@ public class TestClientScanner {
           Mockito.anyInt());
 
       InOrder inOrder = Mockito.inOrder(caller);
+      scanner.setRpcFinished(true);
 
       scanner.loadCache();
 
-      inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(
+      inOrder.verify(caller, Mockito.times(3)).callWithoutRetries(
           Mockito.any(RetryingCallable.class), Mockito.anyInt());
 
-      assertEquals(1, scanner.cache.size());
+      assertEquals(2, scanner.cache.size());
       Result r = scanner.cache.poll();
       assertNotNull(r);
       CellScanner cs = r.cellScanner();
@@ -484,15 +490,6 @@ public class TestClientScanner {
       assertEquals(kv1, cs.current());
       assertFalse(cs.advance());
 
-      scanner.setRpcFinished(true);
-
-      inOrder = Mockito.inOrder(caller);
-
-      scanner.loadCache();
-
-      inOrder.verify(caller, Mockito.times(3)).callWithoutRetries(
-          Mockito.any(RetryingCallable.class), Mockito.anyInt());
-
       r = scanner.cache.poll();
       assertNotNull(r);
       cs = r.cellScanner();

http://git-wip-us.apache.org/repos/asf/hbase/blob/3abd13da/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 1e9f16b..a072dce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -38,9 +38,11 @@ import java.util.NavigableMap;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
 
+import org.apache.commons.lang.mutable.MutableObject;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -77,7 +79,6 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.VersionInfoUtil;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
-import org.apache.hadoop.hbase.exceptions.MergeRegionException;
 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
@@ -96,6 +97,27 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.master.MasterRpcServices;
+import org.apache.hadoop.hbase.quotas.OperationQuota;
+import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
+import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
+import org.apache.hadoop.hbase.regionserver.Leases.Lease;
+import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
+import org.apache.hadoop.hbase.regionserver.Region.Operation;
+import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
+import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
+import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
+import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeResponse;
@@ -107,10 +129,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegion
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
@@ -134,9 +156,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavor
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
@@ -177,18 +196,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescr
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
-import org.apache.hadoop.hbase.quotas.OperationQuota;
-import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
-import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
-import org.apache.hadoop.hbase.regionserver.Leases.Lease;
-import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
-import org.apache.hadoop.hbase.regionserver.Region.Operation;
-import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
-import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
-import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
-import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.DNS;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -201,13 +208,6 @@ import org.apache.hadoop.hbase.wal.WALSplitter;
 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
 import org.apache.zookeeper.KeeperException;
 
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
-
 /**
  * Implements the regionserver RPC services.
  */
@@ -260,8 +260,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   private final PriorityFunction priority;
 
   private final AtomicLong scannerIdGen = new AtomicLong(0L);
-  private final ConcurrentHashMap<String, RegionScannerHolder> scanners =
-    new ConcurrentHashMap<String, RegionScannerHolder>();
+  private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>();
 
   /**
    * The lease timeout period for client scanners (milliseconds).
@@ -281,11 +280,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   /**
    * An Rpc callback for closing a RegionScanner.
    */
-   static class RegionScannerCloseCallBack implements RpcCallback {
+  private static final class RegionScannerCloseCallBack implements RpcCallback {
 
     private final RegionScanner scanner;
 
-    public RegionScannerCloseCallBack(RegionScanner scanner){
+    public RegionScannerCloseCallBack(RegionScanner scanner) {
       this.scanner = scanner;
     }
 
@@ -347,27 +346,31 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   /**
    * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together.
    */
-  private static class RegionScannerHolder {
-    private AtomicLong nextCallSeq = new AtomicLong(0);
-    private RegionScanner s;
-    private Region r;
-    final RpcCallback closeCallBack;
-    final RpcCallback shippedCallback;
-
-    public RegionScannerHolder(RegionScanner s, Region r, RpcCallback closeCallBack,
-        RpcCallback shippedCallback) {
+  private static final class RegionScannerHolder {
+
+    private final AtomicLong nextCallSeq = new AtomicLong(0);
+    private final String scannerName;
+    private final RegionScanner s;
+    private final Region r;
+    private final RpcCallback closeCallBack;
+    private final RpcCallback shippedCallback;
+
+    public RegionScannerHolder(String scannerName, RegionScanner s, Region r,
+        RpcCallback closeCallBack, RpcCallback shippedCallback) {
+      this.scannerName = scannerName;
       this.s = s;
       this.r = r;
       this.closeCallBack = closeCallBack;
       this.shippedCallback = shippedCallback;
     }
 
-    private long getNextCallSeq() {
+    public long getNextCallSeq() {
       return nextCallSeq.get();
     }
 
-    private void incNextCallSeq() {
-      nextCallSeq.incrementAndGet();
+    public boolean incNextCallSeq(long currentSeq) {
+      // Use CAS to prevent multiple scan request running on the same scanner.
+      return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1);
     }
   }
 
@@ -476,19 +479,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     }
   }
 
-  private void addResults(final ScanResponse.Builder builder, final List<Result> results,
-      final RpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) {
+  private void addResults(ScanResponse.Builder builder, List<Result> results,
+      HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) {
     builder.setStale(!isDefaultRegion);
-    if (results == null || results.isEmpty()) return;
+    if (results.isEmpty()) return;
     if (clientCellBlockSupported) {
       for (Result res : results) {
         builder.addCellsPerResult(res.size());
         builder.addPartialFlagPerResult(res.isPartial());
       }
-      ((HBaseRpcController)controller).
-        setCellScanner(CellUtil.createCellScanner(results));
+      controller.setCellScanner(CellUtil.createCellScanner(results));
     } else {
-      for (Result res: results) {
+      for (Result res : results) {
         ClientProtos.Result pbr = ProtobufUtil.toResult(res);
         builder.addResults(pbr);
       }
@@ -1131,6 +1133,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     }
   }
 
+  public
   RegionScanner getScanner(long scannerId) {
     String scannerIdString = Long.toString(scannerId);
     RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
@@ -1202,10 +1205,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     return lastBlock;
   }
 
-  RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r)
+  private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r)
       throws LeaseStillHeldException {
     Lease lease = regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
-        new ScannerListener(scannerName));
+      new ScannerListener(scannerName));
     RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, s, lease);
     RpcCallback closeCallback;
     if (s instanceof RpcCallback) {
@@ -1213,7 +1216,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     } else {
       closeCallback = new RegionScannerCloseCallBack(s);
     }
-    RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback);
+    RegionScannerHolder rsh =
+        new RegionScannerHolder(scannerName, s, r, closeCallback, shippedCallback);
     RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
     assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!";
     return rsh;
@@ -2610,444 +2614,498 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     }
   }
 
-  /**
-   * Scan data in a table.
-   *
-   * @param controller the RPC controller
-   * @param request the scan request
-   * @throws ServiceException
-   */
-  @Override
-  public ScanResponse scan(final RpcController controller, final ScanRequest request)
-  throws ServiceException {
-    OperationQuota quota = null;
-    Leases.Lease lease = null;
-    String scannerName = null;
-    try {
-      if (!request.hasScannerId() && !request.hasScan()) {
-        throw new DoNotRetryIOException(
-          "Missing required input: scannerId or scan");
-      }
-      long scannerId = -1;
-      if (request.hasScannerId()) {
-        scannerId = request.getScannerId();
-        scannerName = String.valueOf(scannerId);
-      }
-      try {
-        checkOpen();
-      } catch (IOException e) {
-        // If checkOpen failed, server not running or filesystem gone,
-        // cancel this lease; filesystem is gone or we're closing or something.
-        if (scannerName != null) {
-          LOG.debug("Server shutting down and client tried to access missing scanner "
-            + scannerName);
-          if (regionServer.leases != null) {
-            try {
-              regionServer.leases.cancelLease(scannerName);
-            } catch (LeaseException le) {
-              // No problem, ignore
-              if (LOG.isTraceEnabled()) {
-                LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
-              }
-             }
-          }
-        }
-        throw e;
-      }
-      requestCount.increment();
-      rpcScanRequestCount.increment();
-
-      int ttl = 0;
-      Region region = null;
-      RegionScanner scanner = null;
-      RegionScannerHolder rsh = null;
-      boolean moreResults = true;
-      boolean closeScanner = false;
-      boolean isSmallScan = false;
-      ScanResponse.Builder builder = ScanResponse.newBuilder();
-      if (request.hasCloseScanner()) {
-        closeScanner = request.getCloseScanner();
-      }
-      int rows = closeScanner ? 0 : 1;
-      if (request.hasNumberOfRows()) {
-        rows = request.getNumberOfRows();
-      }
-      if (request.hasScannerId()) {
-        rsh = scanners.get(scannerName);
-        if (rsh == null) {
-          LOG.warn("Client tried to access missing scanner " + scannerName);
-          throw new UnknownScannerException(
+  // This is used to keep compatible with the old client implementation. Consider remove it if we
+  // decide to drop the support of the client that still sends close request to a region scanner
+  // which has already been exhausted.
+  @Deprecated
+  private static final IOException SCANNER_ALREADY_CLOSED = new IOException() {
+
+    private static final long serialVersionUID = -4305297078988180130L;
+
+    @Override
+    public Throwable fillInStackTrace() {
+      return this;
+    }
+  };
+
+  private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException {
+    String scannerName = Long.toString(request.getScannerId());
+    RegionScannerHolder rsh = scanners.get(scannerName);
+    if (rsh == null) {
+      // just ignore the close request if scanner does not exists.
+      if (request.hasCloseScanner() && request.getCloseScanner()) {
+        throw SCANNER_ALREADY_CLOSED;
+      } else {
+        LOG.warn("Client tried to access missing scanner " + scannerName);
+        throw new UnknownScannerException(
             "Unknown scanner '" + scannerName + "'. This can happen due to any of the following "
                 + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of "
                 + "long wait between consecutive client checkins, c) Server may be closing down, "
                 + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a "
                 + "possible fix would be increasing the value of"
                 + "'hbase.client.scanner.timeout.period' configuration.");
+      }
+    }
+    HRegionInfo hri = rsh.s.getRegionInfo();
+    // Yes, should be the same instance
+    if (regionServer.getOnlineRegion(hri.getRegionName()) != rsh.r) {
+      String msg = "Region was re-opened after the scanner" + scannerName + " was created: "
+          + hri.getRegionNameAsString();
+      LOG.warn(msg + ", closing...");
+      scanners.remove(scannerName);
+      try {
+        rsh.s.close();
+      } catch (IOException e) {
+        LOG.warn("Getting exception closing " + scannerName, e);
+      } finally {
+        try {
+          regionServer.leases.cancelLease(scannerName);
+        } catch (LeaseException e) {
+          LOG.warn("Getting exception closing " + scannerName, e);
         }
-        scanner = rsh.s;
-        HRegionInfo hri = scanner.getRegionInfo();
-        region = regionServer.getRegion(hri.getRegionName());
-        if (region != rsh.r) { // Yes, should be the same instance
-          throw new NotServingRegionException("Region was re-opened after the scanner"
-            + scannerName + " was created: " + hri.getRegionNameAsString());
-        }
-      } else {
-        region = getRegion(request.getRegion());
-        ClientProtos.Scan protoScan = request.getScan();
-        boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
-        Scan scan = ProtobufUtil.toScan(protoScan);
-        // if the request doesn't set this, get the default region setting.
-        if (!isLoadingCfsOnDemandSet) {
-          scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
-        }
+      }
+      throw new NotServingRegionException(msg);
+    }
+    return rsh;
+  }
 
-        isSmallScan = scan.isSmall();
-        if (!scan.hasFamilies()) {
-          // Adding all families to scanner
-          for (byte[] family: region.getTableDesc().getFamiliesKeys()) {
-            scan.addFamily(family);
-          }
-        }
+  private Pair<RegionScannerHolder, Boolean> newRegionScanner(ScanRequest request,
+      ScanResponse.Builder builder) throws IOException {
+    Region region = getRegion(request.getRegion());
+    ClientProtos.Scan protoScan = request.getScan();
+    boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
+    Scan scan = ProtobufUtil.toScan(protoScan);
+    // if the request doesn't set this, get the default region setting.
+    if (!isLoadingCfsOnDemandSet) {
+      scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
+    }
 
-        if (region.getCoprocessorHost() != null) {
-          scanner = region.getCoprocessorHost().preScannerOpen(scan);
-        }
-        if (scanner == null) {
-          scanner = region.getScanner(scan);
-        }
-        if (region.getCoprocessorHost() != null) {
-          scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
-        }
-        scannerId = this.scannerIdGen.incrementAndGet();
-        scannerName = String.valueOf(scannerId);
-        rsh = addScanner(scannerName, scanner, region);
-        ttl = this.scannerLeaseTimeoutPeriod;
-        builder.setMvccReadPoint(scanner.getMvccReadPoint());
-      }
-      if (request.hasRenew() && request.getRenew()) {
-        rsh = scanners.get(scannerName);
-        lease = regionServer.leases.removeLease(scannerName);
-        if (lease != null && rsh != null) {
-          regionServer.leases.addLease(lease);
-          // Increment the nextCallSeq value which is the next expected from client.
-          rsh.incNextCallSeq();
-        }
-        return builder.build();
+    if (!scan.hasFamilies()) {
+      // Adding all families to scanner
+      for (byte[] family : region.getTableDesc().getFamiliesKeys()) {
+        scan.addFamily(family);
       }
-      RpcCallContext context = RpcServer.getCurrentCall();
-      Object lastBlock = null;
+    }
+    RegionScanner scanner = null;
+    if (region.getCoprocessorHost() != null) {
+      scanner = region.getCoprocessorHost().preScannerOpen(scan);
+    }
+    if (scanner == null) {
+      scanner = region.getScanner(scan);
+    }
+    if (region.getCoprocessorHost() != null) {
+      scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
+    }
+    long scannerId = this.scannerIdGen.incrementAndGet();
+    builder.setScannerId(scannerId);
+    builder.setMvccReadPoint(scanner.getMvccReadPoint());
+    builder.setTtl(scannerLeaseTimeoutPeriod);
+    String scannerName = String.valueOf(scannerId);
+    return Pair.newPair(addScanner(scannerName, scanner, region), scan.isSmall());
+  }
 
-      quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
-      long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
+  private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh)
+      throws OutOfOrderScannerNextException {
+    // if nextCallSeq does not match throw Exception straight away. This needs to be
+    // performed even before checking of Lease.
+    // See HBASE-5974
+    if (request.hasNextCallSeq()) {
+      long callSeq = request.getNextCallSeq();
+      if (!rsh.incNextCallSeq(callSeq)) {
+        throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.getNextCallSeq()
+            + " But the nextCallSeq got from client: " + request.getNextCallSeq() + "; request="
+            + TextFormat.shortDebugString(request));
+      }
+    }
+  }
 
-      if (rows > 0) {
-        // if nextCallSeq does not match throw Exception straight away. This needs to be
-        // performed even before checking of Lease.
-        // See HBASE-5974
-        if (request.hasNextCallSeq()) {
-          if (rsh != null) {
-            if (request.getNextCallSeq() != rsh.getNextCallSeq()) {
-              throw new OutOfOrderScannerNextException(
-                "Expected nextCallSeq: " + rsh.getNextCallSeq()
-                + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
-                "; request=" + TextFormat.shortDebugString(request));
-            }
-            // Increment the nextCallSeq value which is the next expected from client.
-            rsh.incNextCallSeq();
+  private void addScannerLeaseBack(Leases.Lease lease) {
+    try {
+      regionServer.leases.addLease(lease);
+    } catch (LeaseStillHeldException e) {
+      // should not happen as the scanner id is unique.
+      throw new AssertionError(e);
+    }
+  }
+
+  private long getTimeLimit(HBaseRpcController controller, boolean allowHeartbeatMessages) {
+    // Set the time limit to be half of the more restrictive timeout value (one of the
+    // timeout values must be positive). In the event that both values are positive, the
+    // more restrictive of the two is used to calculate the limit.
+    if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) {
+      long timeLimitDelta;
+      if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) {
+        timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout);
+      } else {
+        timeLimitDelta = scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
+      }
+      if (controller != null && controller.getCallTimeout() > 0) {
+        timeLimitDelta = Math.min(timeLimitDelta, controller.getCallTimeout());
+      }
+      // Use half of whichever timeout value was more restrictive... But don't allow
+      // the time limit to be less than the allowable minimum (could cause an
+      // immediatate timeout before scanning any data).
+      timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
+      // XXX: Can not use EnvironmentEdge here because TestIncrementTimeRange use a
+      // ManualEnvironmentEdge. Consider using System.nanoTime instead.
+      return System.currentTimeMillis() + timeLimitDelta;
+    }
+    // Default value of timeLimit is negative to indicate no timeLimit should be
+    // enforced.
+    return -1L;
+  }
+
+  // return whether we have more results in region.
+  private boolean scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
+      boolean isSmallScan, long maxQuotaResultSize, int rows, List<Result> results,
+      ScanResponse.Builder builder, MutableObject lastBlock, RpcCallContext context)
+      throws IOException {
+    Region region = rsh.r;
+    RegionScanner scanner = rsh.s;
+    long maxResultSize;
+    if (scanner.getMaxResultSize() > 0) {
+      maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
+    } else {
+      maxResultSize = maxQuotaResultSize;
+    }
+    // This is cells inside a row. Default size is 10 so if many versions or many cfs,
+    // then we'll resize. Resizings show in profiler. Set it higher than 10. For now
+    // arbitrary 32. TODO: keep record of general size of results being returned.
+    List<Cell> values = new ArrayList<Cell>(32);
+    region.startRegionOperation(Operation.SCAN);
+    try {
+      int i = 0;
+      long before = EnvironmentEdgeManager.currentTime();
+      synchronized (scanner) {
+        boolean stale = (region.getRegionInfo().getReplicaId() != 0);
+        boolean clientHandlesPartials =
+            request.hasClientHandlesPartials() && request.getClientHandlesPartials();
+        boolean clientHandlesHeartbeats =
+            request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
+
+        // On the server side we must ensure that the correct ordering of partial results is
+        // returned to the client to allow them to properly reconstruct the partial results.
+        // If the coprocessor host is adding to the result list, we cannot guarantee the
+        // correct ordering of partial results and so we prevent partial results from being
+        // formed.
+        boolean serverGuaranteesOrderOfPartials = results.isEmpty();
+        boolean allowPartialResults =
+            clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan;
+        boolean moreRows = false;
+
+        // Heartbeat messages occur when the processing of the ScanRequest is exceeds a
+        // certain time threshold on the server. When the time threshold is exceeded, the
+        // server stops the scan and sends back whatever Results it has accumulated within
+        // that time period (may be empty). Since heartbeat messages have the potential to
+        // create partial Results (in the event that the timeout occurs in the middle of a
+        // row), we must only generate heartbeat messages when the client can handle both
+        // heartbeats AND partials
+        boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
+
+        long timeLimit = getTimeLimit(controller, allowHeartbeatMessages);
+
+        final LimitScope sizeScope =
+            allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
+        final LimitScope timeScope =
+            allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
+
+        boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics();
+
+        // Configure with limits for this RPC. Set keep progress true since size progress
+        // towards size limit should be kept between calls to nextRaw
+        ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
+        contextBuilder.setSizeLimit(sizeScope, maxResultSize);
+        contextBuilder.setBatchLimit(scanner.getBatch());
+        contextBuilder.setTimeLimit(timeScope, timeLimit);
+        contextBuilder.setTrackMetrics(trackMetrics);
+        ScannerContext scannerContext = contextBuilder.build();
+        boolean limitReached = false;
+        while (i < rows) {
+          // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
+          // batch limit is a limit on the number of cells per Result. Thus, if progress is
+          // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
+          // reset the batch progress between nextRaw invocations since we don't want the
+          // batch progress from previous calls to affect future calls
+          scannerContext.setBatchProgress(0);
+
+          // Collect values to be returned here
+          moreRows = scanner.nextRaw(values, scannerContext);
+
+          if (!values.isEmpty()) {
+            final boolean partial = scannerContext.partialResultFormed();
+            Result r = Result.create(values, null, stale, partial);
+            lastBlock.setValue(addSize(context, r, lastBlock.getValue()));
+            results.add(r);
+            i++;
           }
-        }
-        boolean scannerClosed = false;
-        try {
-          // Remove lease while its being processed in server; protects against case
-          // where processing of request takes > lease expiration time.
-          lease = regionServer.leases.removeLease(scannerName);
-          List<Result> results = new ArrayList<Result>();
 
-          boolean done = false;
-          // Call coprocessor. Get region info from scanner.
-          if (region != null && region.getCoprocessorHost() != null) {
-            Boolean bypass = region.getCoprocessorHost().preScannerNext(
-              scanner, results, rows);
-            if (!results.isEmpty()) {
-              for (Result r : results) {
-                lastBlock = addSize(context, r, lastBlock);
-              }
+          boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
+          boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
+          boolean rowLimitReached = i >= rows;
+          limitReached = sizeLimitReached || timeLimitReached || rowLimitReached;
+
+          if (limitReached || !moreRows) {
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: " + moreRows
+                  + " scannerContext: " + scannerContext);
             }
-            if (bypass != null && bypass.booleanValue()) {
-              done = true;
+            // We only want to mark a ScanResponse as a heartbeat message in the event that
+            // there are more values to be read server side. If there aren't more values,
+            // marking it as a heartbeat is wasteful because the client will need to issue
+            // another ScanRequest only to realize that they already have all the values
+            if (moreRows) {
+              // Heartbeat messages occur when the time limit has been reached.
+              builder.setHeartbeatMessage(timeLimitReached);
             }
+            break;
           }
+          values.clear();
+        }
+        if (limitReached || moreRows) {
+          // We stopped prematurely
+          builder.setMoreResultsInRegion(true);
+        } else {
+          // We didn't get a single batch
+          builder.setMoreResultsInRegion(false);
+        }
 
-          if (!done) {
-            long maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
-            if (maxResultSize <= 0) {
-              maxResultSize = maxQuotaResultSize;
-            }
-            // This is cells inside a row. Default size is 10 so if many versions or many cfs,
-            // then we'll resize. Resizings show in profiler. Set it higher than 10. For now
-            // arbitrary 32. TODO: keep record of general size of results being returned.
-            List<Cell> values = new ArrayList<Cell>(32);
-            region.startRegionOperation(Operation.SCAN);
-            try {
-              int i = 0;
-              long before = EnvironmentEdgeManager.currentTime();
-              synchronized(scanner) {
-                boolean stale = (region.getRegionInfo().getReplicaId() != 0);
-                boolean clientHandlesPartials =
-                    request.hasClientHandlesPartials() && request.getClientHandlesPartials();
-                boolean clientHandlesHeartbeats =
-                    request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
-
-                // On the server side we must ensure that the correct ordering of partial results is
-                // returned to the client to allow them to properly reconstruct the partial results.
-                // If the coprocessor host is adding to the result list, we cannot guarantee the
-                // correct ordering of partial results and so we prevent partial results from being
-                // formed.
-                boolean serverGuaranteesOrderOfPartials = results.isEmpty();
-                boolean allowPartialResults =
-                    clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan;
-                boolean moreRows = false;
-
-                // Heartbeat messages occur when the processing of the ScanRequest is exceeds a
-                // certain time threshold on the server. When the time threshold is exceeded, the
-                // server stops the scan and sends back whatever Results it has accumulated within
-                // that time period (may be empty). Since heartbeat messages have the potential to
-                // create partial Results (in the event that the timeout occurs in the middle of a
-                // row), we must only generate heartbeat messages when the client can handle both
-                // heartbeats AND partials
-                boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
-
-                // Default value of timeLimit is negative to indicate no timeLimit should be
-                // enforced.
-                long timeLimit = -1;
-
-                // Set the time limit to be half of the more restrictive timeout value (one of the
-                // timeout values must be positive). In the event that both values are positive, the
-                // more restrictive of the two is used to calculate the limit.
-                if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) {
-                  long timeLimitDelta;
-                  if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) {
-                    timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout);
-                  } else {
-                    timeLimitDelta =
-                        scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
-                  }
-                  if (controller != null) {
-                    if (controller instanceof HBaseRpcController) {
-                      HBaseRpcController pRpcController =
-                          (HBaseRpcController)controller;
-                      if (pRpcController.getCallTimeout() > 0) {
-                        timeLimitDelta = Math.min(timeLimitDelta, pRpcController.getCallTimeout());
-                      }
-                    } else {
-                      throw new UnsupportedOperationException("We only do " +
-                        "HBaseRpcControllers! FIX IF A PROBLEM: " + controller);
-                    }
-                  }
-                  // Use half of whichever timeout value was more restrictive... But don't allow
-                  // the time limit to be less than the allowable minimum (could cause an
-                  // immediatate timeout before scanning any data).
-                  timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
-                  timeLimit = System.currentTimeMillis() + timeLimitDelta;
-                }
-
-                final LimitScope sizeScope =
-                    allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
-                final LimitScope timeScope =
-                    allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
-
-                boolean trackMetrics =
-                    request.hasTrackScanMetrics() && request.getTrackScanMetrics();
-
-                // Configure with limits for this RPC. Set keep progress true since size progress
-                // towards size limit should be kept between calls to nextRaw
-                ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
-                contextBuilder.setSizeLimit(sizeScope, maxResultSize);
-                contextBuilder.setBatchLimit(scanner.getBatch());
-                contextBuilder.setTimeLimit(timeScope, timeLimit);
-                contextBuilder.setTrackMetrics(trackMetrics);
-                ScannerContext scannerContext = contextBuilder.build();
-                boolean limitReached = false;
-                while (i < rows) {
-                  // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
-                  // batch limit is a limit on the number of cells per Result. Thus, if progress is
-                  // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
-                  // reset the batch progress between nextRaw invocations since we don't want the
-                  // batch progress from previous calls to affect future calls
-                  scannerContext.setBatchProgress(0);
-
-                  // Collect values to be returned here
-                  moreRows = scanner.nextRaw(values, scannerContext);
-
-                  if (!values.isEmpty()) {
-                    final boolean partial = scannerContext.partialResultFormed();
-                    Result r = Result.create(values, null, stale, partial);
-                    lastBlock = addSize(context, r, lastBlock);
-                    results.add(r);
-                    i++;
-                  }
-
-                  boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
-                  boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
-                  boolean rowLimitReached = i >= rows;
-                  limitReached = sizeLimitReached || timeLimitReached || rowLimitReached;
-
-                  if (limitReached || !moreRows) {
-                    if (LOG.isTraceEnabled()) {
-                      LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: "
-                          + moreRows + " scannerContext: " + scannerContext);
-                    }
-                    // We only want to mark a ScanResponse as a heartbeat message in the event that
-                    // there are more values to be read server side. If there aren't more values,
-                    // marking it as a heartbeat is wasteful because the client will need to issue
-                    // another ScanRequest only to realize that they already have all the values
-                    if (moreRows) {
-                      // Heartbeat messages occur when the time limit has been reached.
-                      builder.setHeartbeatMessage(timeLimitReached);
-                    }
-                    break;
-                  }
-                  values.clear();
-                }
-
-                if (limitReached || moreRows) {
-                  // We stopped prematurely
-                  builder.setMoreResultsInRegion(true);
-                } else {
-                  // We didn't get a single batch
-                  builder.setMoreResultsInRegion(false);
-                }
-
-                // Check to see if the client requested that we track metrics server side. If the
-                // client requested metrics, retrieve the metrics from the scanner context.
-                if (trackMetrics) {
-                  Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
-                  ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
-                  NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
-
-                  for (Entry<String, Long> entry : metrics.entrySet()) {
-                    pairBuilder.setName(entry.getKey());
-                    pairBuilder.setValue(entry.getValue());
-                    metricBuilder.addMetrics(pairBuilder.build());
-                  }
-
-                  builder.setScanMetrics(metricBuilder.build());
-                }
-              }
-              region.updateReadRequestsCount(i);
-              long end = EnvironmentEdgeManager.currentTime();
-              long responseCellSize = context != null ? context.getResponseCellSize() : 0;
-              region.getMetrics().updateScanTime(end - before);
-              if (regionServer.metricsRegionServer != null) {
-                regionServer.metricsRegionServer.updateScanSize(responseCellSize);
-                regionServer.metricsRegionServer.updateScanTime(end - before);
-              }
-            } finally {
-              region.closeRegionOperation();
-            }
-            // coprocessor postNext hook
-            if (region != null && region.getCoprocessorHost() != null) {
-              region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
-            }
+        // Check to see if the client requested that we track metrics server side. If the
+        // client requested metrics, retrieve the metrics from the scanner context.
+        if (trackMetrics) {
+          Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
+          ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
+          NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
+
+          for (Entry<String, Long> entry : metrics.entrySet()) {
+            pairBuilder.setName(entry.getKey());
+            pairBuilder.setValue(entry.getValue());
+            metricBuilder.addMetrics(pairBuilder.build());
           }
 
-          quota.addScanResult(results);
+          builder.setScanMetrics(metricBuilder.build());
+        }
+      }
+      region.updateReadRequestsCount(i);
+      long end = EnvironmentEdgeManager.currentTime();
+      long responseCellSize = context != null ? context.getResponseCellSize() : 0;
+      region.getMetrics().updateScanTime(end - before);
+      if (regionServer.metricsRegionServer != null) {
+        regionServer.metricsRegionServer.updateScanSize(responseCellSize);
+        regionServer.metricsRegionServer.updateScanTime(end - before);
+      }
+    } finally {
+      region.closeRegionOperation();
+    }
+    // coprocessor postNext hook
+    if (region.getCoprocessorHost() != null) {
+      region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
+    }
+    return builder.getMoreResultsInRegion();
+  }
 
-          // If the scanner's filter - if any - is done with the scan
-          // and wants to tell the client to stop the scan. This is done by passing
-          // a null result, and setting moreResults to false.
-          if (scanner.isFilterDone() && results.isEmpty()) {
-            moreResults = false;
-            results = null;
-          } else {
-            addResults(builder, results, controller,
-                RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()),
-                isClientCellBlockSupport(context));
-          }
-        } catch (IOException e) {
-          // The scanner state might be left in a dirty state, so we will tell the Client to
-          // fail this RPC and close the scanner while opening up another one from the start of
-          // row that the client has last seen.
-          closeScanner(region, scanner, scannerName, context);
-          // scanner is closed here
-          scannerClosed = true;
-
-          // If it is a CorruptHFileException or a FileNotFoundException, throw the
-          // DoNotRetryIOException. This can avoid the retry in ClientScanner.
-          if (e instanceof CorruptHFileException || e instanceof FileNotFoundException) {
-            throw new DoNotRetryIOException(e);
-          }
-          // We closed the scanner already. Instead of throwing the IOException, and client
-          // retrying with the same scannerId only to get USE on the next RPC, we directly throw
-          // a special exception to save an RPC.
-          if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) {
-            // 1.4.0+ clients know how to handle
-            throw new ScannerResetException("Scanner is closed on the server-side", e);
-          } else {
-            // older clients do not know about SRE. Just throw USE, which they will handle
-            throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
-                + " scanner state for clients older than 1.3.", e);
-          }
-        } finally {
-          // If the scanner is not closed, set the shipped callback
-          if (!scannerClosed) {
-            if (context != null) {
-              context.setCallBack(rsh.shippedCallback);
+  /**
+   * Scan data in a table.
+   *
+   * @param controller the RPC controller
+   * @param request the scan request
+   * @throws ServiceException
+   */
+  @Override
+  public ScanResponse scan(final RpcController controller, final ScanRequest request)
+      throws ServiceException {
+    if (controller != null && !(controller instanceof HBaseRpcController)) {
+      throw new UnsupportedOperationException(
+          "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller);
+    }
+    if (!request.hasScannerId() && !request.hasScan()) {
+      throw new ServiceException(
+          new DoNotRetryIOException("Missing required input: scannerId or scan"));
+    }
+    try {
+      checkOpen();
+    } catch (IOException e) {
+      if (request.hasScannerId()) {
+        String scannerName = Long.toString(request.getScannerId());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+            "Server shutting down and client tried to access missing scanner " + scannerName);
+        }
+        if (regionServer.leases != null) {
+          try {
+            regionServer.leases.cancelLease(scannerName);
+          } catch (LeaseException le) {
+            // No problem, ignore
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
             }
-
-            // Adding resets expiration time on lease.
-            if (scanners.containsKey(scannerName)) {
-              ttl = this.scannerLeaseTimeoutPeriod;
-              // When context != null, adding back the lease will be done in callback set above.
-              if (context == null) {
-                if (lease != null) regionServer.leases.addLease(lease);
-              }
+          }
+        }
+      }
+      throw new ServiceException(e);
+    }
+    requestCount.increment();
+    rpcScanRequestCount.increment();
+    RegionScannerHolder rsh;
+    ScanResponse.Builder builder = ScanResponse.newBuilder();
+    boolean isSmallScan;
+    try {
+      if (request.hasScannerId()) {
+        rsh = getRegionScanner(request);
+        isSmallScan = false;
+      } else {
+        Pair<RegionScannerHolder, Boolean> pair = newRegionScanner(request, builder);
+        rsh = pair.getFirst();
+        isSmallScan = pair.getSecond().booleanValue();
+      }
+    } catch (IOException e) {
+      if (e == SCANNER_ALREADY_CLOSED) {
+        // Now we will close scanner automatically if there are no more results for this region but
+        // the old client will still send a close request to us. Just ignore it and return.
+        return builder.build();
+      }
+      throw new ServiceException(e);
+    }
+    Region region = rsh.r;
+    String scannerName = rsh.scannerName;
+    Leases.Lease lease;
+    try {
+      // Remove lease while its being processed in server; protects against case
+      // where processing of request takes > lease expiration time.
+      lease = regionServer.leases.removeLease(scannerName);
+    } catch (LeaseException e) {
+      throw new ServiceException(e);
+    }
+    if (request.hasRenew() && request.getRenew()) {
+      // add back and return
+      addScannerLeaseBack(lease);
+      try {
+        checkScanNextCallSeq(request, rsh);
+      } catch (OutOfOrderScannerNextException e) {
+        throw new ServiceException(e);
+      }
+      return builder.build();
+    }
+    OperationQuota quota;
+    try {
+      quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
+    } catch (IOException e) {
+      addScannerLeaseBack(lease);
+      throw new ServiceException(e);
+    };
+    try {
+      checkScanNextCallSeq(request, rsh);
+    } catch (OutOfOrderScannerNextException e) {
+      addScannerLeaseBack(lease);
+      throw new ServiceException(e);
+    }
+    // Now we have increased the next call sequence. If we give client an error, the retry will
+    // never success. So we'd better close the scanner and return a DoNotRetryIOException to client
+    // and then client will try to open a new scanner.
+    boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false;
+    int rows; // this is scan.getCaching
+    if (request.hasNumberOfRows()) {
+      rows = request.getNumberOfRows();
+    } else {
+      rows = closeScanner ? 0 : 1;
+    }
+    RpcCallContext context = RpcServer.getCurrentCall();
+    // now let's do the real scan.
+    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
+    RegionScanner scanner = rsh.s;
+    boolean moreResults = true;
+    boolean moreResultsInRegion = true;
+    MutableObject lastBlock = new MutableObject();
+    boolean scannerClosed = false;
+    try {
+      List<Result> results = new ArrayList<>();
+      if (rows > 0) {
+        boolean done = false;
+        // Call coprocessor. Get region info from scanner.
+        if (region.getCoprocessorHost() != null) {
+          Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows);
+          if (!results.isEmpty()) {
+            for (Result r : results) {
+              lastBlock.setValue(addSize(context, r, lastBlock.getValue()));
             }
           }
+          if (bypass != null && bypass.booleanValue()) {
+            done = true;
+          }
+        }
+        if (!done) {
+          moreResultsInRegion = scan((HBaseRpcController) controller, request, rsh, isSmallScan,
+            maxQuotaResultSize, rows, results, builder, lastBlock, context);
         }
       }
 
-      if (!moreResults || closeScanner) {
-        ttl = 0;
+      quota.addScanResult(results);
+
+      if (scanner.isFilterDone() && results.isEmpty()) {
+        // If the scanner's filter - if any - is done with the scan
+        // only set moreResults to false if the results is empty. This is used to keep compatible
+        // with the old scan implementation where we just ignore the returned results if moreResults
+        // is false. Can remove the isEmpty check after we get rid of the old implementation.
         moreResults = false;
-        if (closeScanner(region, scanner, scannerName, context)) {
-          return builder.build(); // bypass
-        }
       }
-
-      if (ttl > 0) {
-        builder.setTtl(ttl);
+      addResults(builder, results, (HBaseRpcController) controller,
+        RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()),
+        isClientCellBlockSupport(context));
+      if (!moreResults || !moreResultsInRegion || closeScanner) {
+        scannerClosed = true;
+        closeScanner(region, scanner, scannerName, context);
       }
-      builder.setScannerId(scannerId);
       builder.setMoreResults(moreResults);
       return builder.build();
-    } catch (IOException ie) {
-      if (scannerName != null && ie instanceof NotServingRegionException) {
-        RegionScannerHolder rsh = scanners.remove(scannerName);
-        if (rsh != null) {
-          try {
-            RegionScanner scanner = rsh.s;
-            LOG.warn(scannerName + " encountered " + ie.getMessage() + ", closing ...");
-            scanner.close();
-            regionServer.leases.cancelLease(scannerName);
-          } catch (IOException e) {
-           LOG.warn("Getting exception closing " + scannerName, e);
-          }
+    } catch (Exception e) {
+      try {
+        // scanner is closed here
+        scannerClosed = true;
+        // The scanner state might be left in a dirty state, so we will tell the Client to
+        // fail this RPC and close the scanner while opening up another one from the start of
+        // row that the client has last seen.
+        closeScanner(region, scanner, scannerName, context);
+
+
+        // If it is a CorruptHFileException or a FileNotFoundException, throw the
+        // DoNotRetryIOException. This can avoid the retry in ClientScanner.
+        if (e instanceof CorruptHFileException || e instanceof FileNotFoundException) {
+          throw new DoNotRetryIOException(e);
         }
+        // We closed the scanner already. Instead of throwing the IOException, and client
+        // retrying with the same scannerId only to get USE on the next RPC, we directly throw
+        // a special exception to save an RPC.
+        if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) {
+          // 1.4.0+ clients know how to handle
+          throw new ScannerResetException("Scanner is closed on the server-side", e);
+        } else {
+          // older clients do not know about SRE. Just throw USE, which they will handle
+          throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
+              + " scanner state for clients older than 1.3.", e);
+        }
+      } catch (IOException ioe) {
+        throw new ServiceException(ioe);
       }
-      throw new ServiceException(ie);
     } finally {
-      if (quota != null) {
-        quota.close();
+      if (!scannerClosed) {
+        // Adding resets expiration time on lease.
+        // the closeCallBack will be set in closeScanner so here we only care about shippedCallback
+        if (context != null) {
+          context.setCallBack(rsh.shippedCallback);
+        } else {
+          // When context != null, adding back the lease will be done in callback set above.
+          addScannerLeaseBack(lease);
+        }
       }
+      quota.close();
     }
   }
 
-  private boolean closeScanner(Region region, RegionScanner scanner, String scannerName,
+  private void closeScanner(Region region, RegionScanner scanner, String scannerName,
       RpcCallContext context) throws IOException {
-    if (region != null && region.getCoprocessorHost() != null) {
+    if (region.getCoprocessorHost() != null) {
       if (region.getCoprocessorHost().preScannerClose(scanner)) {
-        return true; // bypass
+        // bypass the actual close.
+        return;
       }
     }
     RegionScannerHolder rsh = scanners.remove(scannerName);
@@ -3057,19 +3115,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       } else {
         rsh.s.close();
       }
-      try {
-        regionServer.leases.cancelLease(scannerName);
-      } catch (LeaseException le) {
-        // No problem, ignore
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
-        }
-      }
-      if (region != null && region.getCoprocessorHost() != null) {
+      if (region.getCoprocessorHost() != null) {
         region.getCoprocessorHost().postScannerClose(scanner);
       }
     }
-    return false;
   }
 
   @Override