You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2015/04/18 00:42:56 UTC

[2/2] hbase git commit: HBASE-13090 Progress heartbeats for long running scanners

HBASE-13090 Progress heartbeats for long running scanners

Signed-off-by: stack <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/abe3796a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/abe3796a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/abe3796a

Branch: refs/heads/master
Commit: abe3796a9907485c875932caa5f1c82071495c0f
Parents: 2c5dc53
Author: Jonathan Lawlor <jo...@cloudera.com>
Authored: Tue Mar 10 14:24:07 2015 -0700
Committer: stack <st...@apache.org>
Committed: Fri Apr 17 15:42:46 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/client/ClientScanner.java      |  49 +-
 .../hadoop/hbase/client/ScannerCallable.java    |  23 +
 .../client/ScannerCallableWithReplicas.java     |  10 +
 .../hadoop/hbase/protobuf/RequestConverter.java |   3 +
 .../src/main/resources/hbase-default.xml        |  12 +
 .../hbase/protobuf/generated/ClientProtos.java  | 349 ++++++++++--
 hbase-protocol/src/main/protobuf/Client.proto   |   7 +
 .../hbase/client/ClientSideRegionScanner.java   |   4 +-
 .../hadoop/hbase/regionserver/HRegion.java      |  59 +-
 .../hadoop/hbase/regionserver/KeyValueHeap.java |   2 +
 .../regionserver/NoLimitScannerContext.java     |  28 +-
 .../hbase/regionserver/RSRpcServices.java       |  94 +++-
 .../hbase/regionserver/ScannerContext.java      | 124 ++++-
 .../hadoop/hbase/regionserver/StoreScanner.java |  31 ++
 .../hbase/TestPartialResultsFromClientSide.java |   3 +-
 .../coprocessor/TestCoprocessorInterface.java   |   1 -
 .../TestScannerHeartbeatMessages.java           | 538 +++++++++++++++++++
 .../compactions/TestStripeCompactionPolicy.java |   1 -
 18 files changed, 1234 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index ccd8c2d..6a129c7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -393,7 +393,6 @@ public class ClientScanner extends AbstractClientScanner {
         // returns an empty array if scanning is to go on and we've just
         // exhausted current region.
         values = call(callable, caller, scannerTimeout);
-
         // When the replica switch happens, we need to do certain operations
         // again. The callable will openScanner with the right startkey
         // but we need to pick up from there. Bypass the rest of the loop
@@ -482,7 +481,8 @@ public class ClientScanner extends AbstractClientScanner {
       // Groom the array of Results that we received back from the server before adding that
       // Results to the scanner's cache. If partial results are not allowed to be seen by the
       // caller, all book keeping will be performed within this method.
-      List<Result> resultsToAddToCache = getResultsToAddToCache(values);
+      List<Result> resultsToAddToCache =
+          getResultsToAddToCache(values, callable.isHeartbeatMessage());
       if (!resultsToAddToCache.isEmpty()) {
         for (Result rs : resultsToAddToCache) {
           cache.add(rs);
@@ -494,6 +494,19 @@ public class ClientScanner extends AbstractClientScanner {
           this.lastResult = rs;
         }
       }
+
+      // Caller of this method just wants a Result. If we see a heartbeat message, it means
+      // processing of the scan is taking a long time server side. Rather than continue to
+      // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing
+      // unnecesary delays to the caller
+      if (callable.isHeartbeatMessage() && cache.size() > 0) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Heartbeat message received and cache contains Results."
+              + " Breaking out of scan loop");
+        }
+        break;
+      }
+
       // We expect that the server won't have more results for us when we exhaust
       // the size (bytes or count) of the results returned. If the server *does* inform us that
       // there are more results, we want to avoid possiblyNextScanner(...). Only when we actually
@@ -507,20 +520,38 @@ public class ClientScanner extends AbstractClientScanner {
       // !partialResults.isEmpty() means that we are still accumulating partial Results for a
       // row. We should not change scanners before we receive all the partial Results for that
       // row.
-    } while (remainingResultSize > 0 && countdown > 0 && !serverHasMoreResults
+    } while (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults)
         && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null)));
   }
 
   /**
+   * @param remainingResultSize
+   * @param remainingRows
+   * @param regionHasMoreResults
+   * @return true when the current region has been exhausted. When the current region has been
+   *         exhausted, the region must be changed before scanning can continue
+   */
+  private boolean doneWithRegion(long remainingResultSize, int remainingRows,
+      boolean regionHasMoreResults) {
+    return remainingResultSize > 0 && remainingRows > 0 && !regionHasMoreResults;
+  }
+
+  /**
    * This method ensures all of our book keeping regarding partial results is kept up to date. This
    * method should be called once we know that the results we received back from the RPC request do
    * not contain errors. We return a list of results that should be added to the cache. In general,
    * this list will contain all NON-partial results from the input array (unless the client has
    * specified that they are okay with receiving partial results)
+   * @param resultsFromServer The array of {@link Result}s returned from the server
+   * @param heartbeatMessage Flag indicating whether or not the response received from the server
+   *          represented a complete response, or a heartbeat message that was sent to keep the
+   *          client-server connection alive
    * @return the list of results that should be added to the cache.
    * @throws IOException
    */
-  protected List<Result> getResultsToAddToCache(Result[] resultsFromServer) throws IOException {
+  protected List<Result>
+      getResultsToAddToCache(Result[] resultsFromServer, boolean heartbeatMessage)
+          throws IOException {
     int resultSize = resultsFromServer != null ? resultsFromServer.length : 0;
     List<Result> resultsToAddToCache = new ArrayList<Result>(resultSize);
 
@@ -538,10 +569,14 @@ public class ClientScanner extends AbstractClientScanner {
       return resultsToAddToCache;
     }
 
-    // If no results were returned it indicates that we have the all the partial results necessary
-    // to construct the complete result.
+    // If no results were returned it indicates that either we have the all the partial results
+    // necessary to construct the complete result or the server had to send a heartbeat message
+    // to the client to keep the client-server connection alive
     if (resultsFromServer == null || resultsFromServer.length == 0) {
-      if (!partialResults.isEmpty()) {
+      // If this response was an empty heartbeat message, then we have not exhausted the region
+      // and thus there may be more partials server side that still need to be added to the partial
+      // list before we form the complete Result
+      if (!partialResults.isEmpty() && !heartbeatMessage) {
         resultsToAddToCache.add(Result.createCompleteResult(partialResults));
         clearPartialResults();
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 714c9fe..b53c9d6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -78,6 +78,12 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
   protected final int id;
   protected boolean serverHasMoreResultsContext;
   protected boolean serverHasMoreResults;
+
+  /**
+   * Saves whether or not the most recent response from the server was a heartbeat message.
+   * Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()}
+   */
+  protected boolean heartbeatMessage = false;
   static {
     try {
       myAddress = DNS.getDefaultHost("default", "default");
@@ -191,6 +197,8 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
       } else {
         Result [] rrs = null;
         ScanRequest request = null;
+        // Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server
+        setHeartbeatMessage(false);
         try {
           incRPCcallsMetrics();
           request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
@@ -211,6 +219,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
             // See HBASE-5974
             nextCallSeq++;
             long timestamp = System.currentTimeMillis();
+            setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage());
             // Results are returned via controller
             CellScanner cellScanner = controller.cellScanner();
             rrs = ResponseConverter.getResults(cellScanner, response);
@@ -291,6 +300,20 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
     return null;
   }
 
+  /**
+   * @return true when the most recent RPC response indicated that the response was a heartbeat
+   *         message. Heartbeat messages are sent back from the server when the processing of the
+   *         scan request exceeds a certain time threshold. Heartbeats allow the server to avoid
+   *         timeouts during long running scan operations.
+   */
+  protected boolean isHeartbeatMessage() {
+    return heartbeatMessage;
+  }
+
+  protected void setHeartbeatMessage(boolean heartbeatMessage) {
+    this.heartbeatMessage = heartbeatMessage;
+  }
+
   private void incRPCcallsMetrics() {
     if (this.scanMetrics == null) {
       return;

http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index 7ba152b..d672852 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -273,6 +273,16 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
     return replicaSwitched.get();
   }
 
+  /**
+   * @return true when the most recent RPC response indicated that the response was a heartbeat
+   *         message. Heartbeat messages are sent back from the server when the processing of the
+   *         scan request exceeds a certain time threshold. Heartbeats allow the server to avoid
+   *         timeouts during long running scan operations.
+   */
+  public boolean isHeartbeatMessage() {
+    return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage();
+  }
+
   private int addCallsForCurrentReplica(
       ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) {
     RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);

http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
index 16c3dbf..b0dac2f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
@@ -489,6 +489,7 @@ public final class RequestConverter {
     builder.setRegion(region);
     builder.setScan(ProtobufUtil.toScan(scan));
     builder.setClientHandlesPartials(true);
+    builder.setClientHandlesHeartbeats(true);
     return builder.build();
   }
 
@@ -507,6 +508,7 @@ public final class RequestConverter {
     builder.setCloseScanner(closeScanner);
     builder.setScannerId(scannerId);
     builder.setClientHandlesPartials(true);
+    builder.setClientHandlesHeartbeats(true);
     return builder.build();
   }
 
@@ -527,6 +529,7 @@ public final class RequestConverter {
     builder.setScannerId(scannerId);
     builder.setNextCallSeq(nextCallSeq);
     builder.setClientHandlesPartials(true);
+    builder.setClientHandlesHeartbeats(true);
     return builder.build();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index bf14a53..f107fb7 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -905,6 +905,18 @@ possible configurations would overwhelm and obscure the important.
         but will eventually throw a TimeoutException.</description>
   </property>
   <property>
+    <name>hbase.cells.scanned.per.heartbeat.check</name>
+    <value>10000</value>
+    <description>The number of cells scanned in between heartbeat checks. Heartbeat
+        checks occur during the processing of scans to determine whether or not the
+        server should stop scanning in order to send back a heartbeat message to the
+        client. Heartbeat messages are used to keep the client-server connection alive
+        during long running scans. Small values mean that the heartbeat checks will
+        occur more often and thus will provide a tighter bound on the execution time of
+        the scan. Larger values mean that the heartbeat checks occur less frequently
+        </description>
+  </property>
+  <property>
     <name>hbase.rpc.shortoperation.timeout</name>
     <value>10000</value>
     <description>This is another version of "hbase.rpc.timeout". For those RPC operation

http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
index 60ab651..2991ece 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
@@ -16433,6 +16433,16 @@ public final class ClientProtos {
      * <code>optional bool client_handles_partials = 7;</code>
      */
     boolean getClientHandlesPartials();
+
+    // optional bool client_handles_heartbeats = 8;
+    /**
+     * <code>optional bool client_handles_heartbeats = 8;</code>
+     */
+    boolean hasClientHandlesHeartbeats();
+    /**
+     * <code>optional bool client_handles_heartbeats = 8;</code>
+     */
+    boolean getClientHandlesHeartbeats();
   }
   /**
    * Protobuf type {@code ScanRequest}
@@ -16549,6 +16559,11 @@ public final class ClientProtos {
               clientHandlesPartials_ = input.readBool();
               break;
             }
+            case 64: {
+              bitField0_ |= 0x00000080;
+              clientHandlesHeartbeats_ = input.readBool();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -16713,6 +16728,22 @@ public final class ClientProtos {
       return clientHandlesPartials_;
     }
 
+    // optional bool client_handles_heartbeats = 8;
+    public static final int CLIENT_HANDLES_HEARTBEATS_FIELD_NUMBER = 8;
+    private boolean clientHandlesHeartbeats_;
+    /**
+     * <code>optional bool client_handles_heartbeats = 8;</code>
+     */
+    public boolean hasClientHandlesHeartbeats() {
+      return ((bitField0_ & 0x00000080) == 0x00000080);
+    }
+    /**
+     * <code>optional bool client_handles_heartbeats = 8;</code>
+     */
+    public boolean getClientHandlesHeartbeats() {
+      return clientHandlesHeartbeats_;
+    }
+
     private void initFields() {
       region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
       scan_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.getDefaultInstance();
@@ -16721,6 +16752,7 @@ public final class ClientProtos {
       closeScanner_ = false;
       nextCallSeq_ = 0L;
       clientHandlesPartials_ = false;
+      clientHandlesHeartbeats_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -16767,6 +16799,9 @@ public final class ClientProtos {
       if (((bitField0_ & 0x00000040) == 0x00000040)) {
         output.writeBool(7, clientHandlesPartials_);
       }
+      if (((bitField0_ & 0x00000080) == 0x00000080)) {
+        output.writeBool(8, clientHandlesHeartbeats_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -16804,6 +16839,10 @@ public final class ClientProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(7, clientHandlesPartials_);
       }
+      if (((bitField0_ & 0x00000080) == 0x00000080)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(8, clientHandlesHeartbeats_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -16862,6 +16901,11 @@ public final class ClientProtos {
         result = result && (getClientHandlesPartials()
             == other.getClientHandlesPartials());
       }
+      result = result && (hasClientHandlesHeartbeats() == other.hasClientHandlesHeartbeats());
+      if (hasClientHandlesHeartbeats()) {
+        result = result && (getClientHandlesHeartbeats()
+            == other.getClientHandlesHeartbeats());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -16903,6 +16947,10 @@ public final class ClientProtos {
         hash = (37 * hash) + CLIENT_HANDLES_PARTIALS_FIELD_NUMBER;
         hash = (53 * hash) + hashBoolean(getClientHandlesPartials());
       }
+      if (hasClientHandlesHeartbeats()) {
+        hash = (37 * hash) + CLIENT_HANDLES_HEARTBEATS_FIELD_NUMBER;
+        hash = (53 * hash) + hashBoolean(getClientHandlesHeartbeats());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -17049,6 +17097,8 @@ public final class ClientProtos {
         bitField0_ = (bitField0_ & ~0x00000020);
         clientHandlesPartials_ = false;
         bitField0_ = (bitField0_ & ~0x00000040);
+        clientHandlesHeartbeats_ = false;
+        bitField0_ = (bitField0_ & ~0x00000080);
         return this;
       }
 
@@ -17113,6 +17163,10 @@ public final class ClientProtos {
           to_bitField0_ |= 0x00000040;
         }
         result.clientHandlesPartials_ = clientHandlesPartials_;
+        if (((from_bitField0_ & 0x00000080) == 0x00000080)) {
+          to_bitField0_ |= 0x00000080;
+        }
+        result.clientHandlesHeartbeats_ = clientHandlesHeartbeats_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -17150,6 +17204,9 @@ public final class ClientProtos {
         if (other.hasClientHandlesPartials()) {
           setClientHandlesPartials(other.getClientHandlesPartials());
         }
+        if (other.hasClientHandlesHeartbeats()) {
+          setClientHandlesHeartbeats(other.getClientHandlesHeartbeats());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -17588,6 +17645,39 @@ public final class ClientProtos {
         return this;
       }
 
+      // optional bool client_handles_heartbeats = 8;
+      private boolean clientHandlesHeartbeats_ ;
+      /**
+       * <code>optional bool client_handles_heartbeats = 8;</code>
+       */
+      public boolean hasClientHandlesHeartbeats() {
+        return ((bitField0_ & 0x00000080) == 0x00000080);
+      }
+      /**
+       * <code>optional bool client_handles_heartbeats = 8;</code>
+       */
+      public boolean getClientHandlesHeartbeats() {
+        return clientHandlesHeartbeats_;
+      }
+      /**
+       * <code>optional bool client_handles_heartbeats = 8;</code>
+       */
+      public Builder setClientHandlesHeartbeats(boolean value) {
+        bitField0_ |= 0x00000080;
+        clientHandlesHeartbeats_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bool client_handles_heartbeats = 8;</code>
+       */
+      public Builder clearClientHandlesHeartbeats() {
+        bitField0_ = (bitField0_ & ~0x00000080);
+        clientHandlesHeartbeats_ = false;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:ScanRequest)
     }
 
@@ -17806,6 +17896,30 @@ public final class ClientProtos {
      * </pre>
      */
     boolean getMoreResultsInRegion();
+
+    // optional bool heartbeat_message = 9;
+    /**
+     * <code>optional bool heartbeat_message = 9;</code>
+     *
+     * <pre>
+     * This field is filled in if the server is sending back a heartbeat message.
+     * Heartbeat messages are sent back to the client to prevent the scanner from
+     * timing out. Seeing a heartbeat message communicates to the Client that the
+     * server would have continued to scan had the time limit not been reached.
+     * </pre>
+     */
+    boolean hasHeartbeatMessage();
+    /**
+     * <code>optional bool heartbeat_message = 9;</code>
+     *
+     * <pre>
+     * This field is filled in if the server is sending back a heartbeat message.
+     * Heartbeat messages are sent back to the client to prevent the scanner from
+     * timing out. Seeing a heartbeat message communicates to the Client that the
+     * server would have continued to scan had the time limit not been reached.
+     * </pre>
+     */
+    boolean getHeartbeatMessage();
   }
   /**
    * Protobuf type {@code ScanResponse}
@@ -17939,6 +18053,11 @@ public final class ClientProtos {
               moreResultsInRegion_ = input.readBool();
               break;
             }
+            case 72: {
+              bitField0_ |= 0x00000020;
+              heartbeatMessage_ = input.readBool();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -18252,6 +18371,36 @@ public final class ClientProtos {
       return moreResultsInRegion_;
     }
 
+    // optional bool heartbeat_message = 9;
+    public static final int HEARTBEAT_MESSAGE_FIELD_NUMBER = 9;
+    private boolean heartbeatMessage_;
+    /**
+     * <code>optional bool heartbeat_message = 9;</code>
+     *
+     * <pre>
+     * This field is filled in if the server is sending back a heartbeat message.
+     * Heartbeat messages are sent back to the client to prevent the scanner from
+     * timing out. Seeing a heartbeat message communicates to the Client that the
+     * server would have continued to scan had the time limit not been reached.
+     * </pre>
+     */
+    public boolean hasHeartbeatMessage() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    /**
+     * <code>optional bool heartbeat_message = 9;</code>
+     *
+     * <pre>
+     * This field is filled in if the server is sending back a heartbeat message.
+     * Heartbeat messages are sent back to the client to prevent the scanner from
+     * timing out. Seeing a heartbeat message communicates to the Client that the
+     * server would have continued to scan had the time limit not been reached.
+     * </pre>
+     */
+    public boolean getHeartbeatMessage() {
+      return heartbeatMessage_;
+    }
+
     private void initFields() {
       cellsPerResult_ = java.util.Collections.emptyList();
       scannerId_ = 0L;
@@ -18261,6 +18410,7 @@ public final class ClientProtos {
       stale_ = false;
       partialFlagPerResult_ = java.util.Collections.emptyList();
       moreResultsInRegion_ = false;
+      heartbeatMessage_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -18298,6 +18448,9 @@ public final class ClientProtos {
       if (((bitField0_ & 0x00000010) == 0x00000010)) {
         output.writeBool(8, moreResultsInRegion_);
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeBool(9, heartbeatMessage_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -18346,6 +18499,10 @@ public final class ClientProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(8, moreResultsInRegion_);
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(9, heartbeatMessage_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -18400,6 +18557,11 @@ public final class ClientProtos {
         result = result && (getMoreResultsInRegion()
             == other.getMoreResultsInRegion());
       }
+      result = result && (hasHeartbeatMessage() == other.hasHeartbeatMessage());
+      if (hasHeartbeatMessage()) {
+        result = result && (getHeartbeatMessage()
+            == other.getHeartbeatMessage());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -18445,6 +18607,10 @@ public final class ClientProtos {
         hash = (37 * hash) + MORE_RESULTS_IN_REGION_FIELD_NUMBER;
         hash = (53 * hash) + hashBoolean(getMoreResultsInRegion());
       }
+      if (hasHeartbeatMessage()) {
+        hash = (37 * hash) + HEARTBEAT_MESSAGE_FIELD_NUMBER;
+        hash = (53 * hash) + hashBoolean(getHeartbeatMessage());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -18581,6 +18747,8 @@ public final class ClientProtos {
         bitField0_ = (bitField0_ & ~0x00000040);
         moreResultsInRegion_ = false;
         bitField0_ = (bitField0_ & ~0x00000080);
+        heartbeatMessage_ = false;
+        bitField0_ = (bitField0_ & ~0x00000100);
         return this;
       }
 
@@ -18648,6 +18816,10 @@ public final class ClientProtos {
           to_bitField0_ |= 0x00000010;
         }
         result.moreResultsInRegion_ = moreResultsInRegion_;
+        if (((from_bitField0_ & 0x00000100) == 0x00000100)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.heartbeatMessage_ = heartbeatMessage_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -18725,6 +18897,9 @@ public final class ClientProtos {
         if (other.hasMoreResultsInRegion()) {
           setMoreResultsInRegion(other.getMoreResultsInRegion());
         }
+        if (other.hasHeartbeatMessage()) {
+          setHeartbeatMessage(other.getHeartbeatMessage());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -19561,6 +19736,67 @@ public final class ClientProtos {
         return this;
       }
 
+      // optional bool heartbeat_message = 9;
+      private boolean heartbeatMessage_ ;
+      /**
+       * <code>optional bool heartbeat_message = 9;</code>
+       *
+       * <pre>
+       * This field is filled in if the server is sending back a heartbeat message.
+       * Heartbeat messages are sent back to the client to prevent the scanner from
+       * timing out. Seeing a heartbeat message communicates to the Client that the
+       * server would have continued to scan had the time limit not been reached.
+       * </pre>
+       */
+      public boolean hasHeartbeatMessage() {
+        return ((bitField0_ & 0x00000100) == 0x00000100);
+      }
+      /**
+       * <code>optional bool heartbeat_message = 9;</code>
+       *
+       * <pre>
+       * This field is filled in if the server is sending back a heartbeat message.
+       * Heartbeat messages are sent back to the client to prevent the scanner from
+       * timing out. Seeing a heartbeat message communicates to the Client that the
+       * server would have continued to scan had the time limit not been reached.
+       * </pre>
+       */
+      public boolean getHeartbeatMessage() {
+        return heartbeatMessage_;
+      }
+      /**
+       * <code>optional bool heartbeat_message = 9;</code>
+       *
+       * <pre>
+       * This field is filled in if the server is sending back a heartbeat message.
+       * Heartbeat messages are sent back to the client to prevent the scanner from
+       * timing out. Seeing a heartbeat message communicates to the Client that the
+       * server would have continued to scan had the time limit not been reached.
+       * </pre>
+       */
+      public Builder setHeartbeatMessage(boolean value) {
+        bitField0_ |= 0x00000100;
+        heartbeatMessage_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bool heartbeat_message = 9;</code>
+       *
+       * <pre>
+       * This field is filled in if the server is sending back a heartbeat message.
+       * Heartbeat messages are sent back to the client to prevent the scanner from
+       * timing out. Seeing a heartbeat message communicates to the Client that the
+       * server would have continued to scan had the time limit not been reached.
+       * </pre>
+       */
+      public Builder clearHeartbeatMessage() {
+        bitField0_ = (bitField0_ & ~0x00000100);
+        heartbeatMessage_ = false;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:ScanResponse)
     }
 
@@ -32690,63 +32926,64 @@ public final class ClientProtos {
       "lies_on_demand\030\r \001(\010\022\r\n\005small\030\016 \001(\010\022\027\n\010r" +
       "eversed\030\017 \001(\010:\005false\022)\n\013consistency\030\020 \001(" +
       "\0162\014.Consistency:\006STRONG\022\017\n\007caching\030\021 \001(\r" +
-      "\"\277\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Regio",
+      "\"\342\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Regio",
       "nSpecifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\022\n\nscann" +
       "er_id\030\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rc" +
       "lose_scanner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(" +
-      "\004\022\037\n\027client_handles_partials\030\007 \001(\010\"\311\001\n\014S" +
-      "canResponse\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n" +
-      "\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022" +
-      "\013\n\003ttl\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Result\022\r" +
-      "\n\005stale\030\006 \001(\010\022\037\n\027partial_flag_per_result" +
-      "\030\007 \003(\010\022\036\n\026more_results_in_region\030\010 \001(\010\"\263" +
-      "\001\n\024BulkLoadHFileRequest\022 \n\006region\030\001 \002(\0132",
-      "\020.RegionSpecifier\0225\n\013family_path\030\002 \003(\0132 " +
-      ".BulkLoadHFileRequest.FamilyPath\022\026\n\016assi" +
-      "gn_seq_num\030\003 \001(\010\032*\n\nFamilyPath\022\016\n\006family" +
-      "\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileRes" +
-      "ponse\022\016\n\006loaded\030\001 \002(\010\"a\n\026CoprocessorServ" +
-      "iceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002" +
-      "(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014" +
-      "\"9\n\030CoprocessorServiceResult\022\035\n\005value\030\001 " +
-      "\001(\0132\016.NameBytesPair\"d\n\031CoprocessorServic" +
-      "eRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifi",
-      "er\022%\n\004call\030\002 \002(\0132\027.CoprocessorServiceCal" +
-      "l\"]\n\032CoprocessorServiceResponse\022 \n\006regio" +
-      "n\030\001 \002(\0132\020.RegionSpecifier\022\035\n\005value\030\002 \002(\013" +
-      "2\016.NameBytesPair\"{\n\006Action\022\r\n\005index\030\001 \001(" +
-      "\r\022 \n\010mutation\030\002 \001(\0132\016.MutationProto\022\021\n\003g" +
-      "et\030\003 \001(\0132\004.Get\022-\n\014service_call\030\004 \001(\0132\027.C" +
-      "oprocessorServiceCall\"Y\n\014RegionAction\022 \n" +
-      "\006region\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006atomi" +
-      "c\030\002 \001(\010\022\027\n\006action\030\003 \003(\0132\007.Action\"D\n\017Regi" +
-      "onLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:\0010\022\030\n\r",
-      "heapOccupancy\030\002 \001(\005:\0010\"\266\001\n\021ResultOrExcep" +
-      "tion\022\r\n\005index\030\001 \001(\r\022\027\n\006result\030\002 \001(\0132\007.Re" +
-      "sult\022!\n\texception\030\003 \001(\0132\016.NameBytesPair\022" +
-      "1\n\016service_result\030\004 \001(\0132\031.CoprocessorSer" +
-      "viceResult\022#\n\tloadStats\030\005 \001(\0132\020.RegionLo" +
-      "adStats\"f\n\022RegionActionResult\022-\n\021resultO" +
-      "rException\030\001 \003(\0132\022.ResultOrException\022!\n\t" +
-      "exception\030\002 \001(\0132\016.NameBytesPair\"f\n\014Multi" +
-      "Request\022#\n\014regionAction\030\001 \003(\0132\r.RegionAc" +
-      "tion\022\022\n\nnonceGroup\030\002 \001(\004\022\035\n\tcondition\030\003 ",
-      "\001(\0132\n.Condition\"S\n\rMultiResponse\022/\n\022regi" +
-      "onActionResult\030\001 \003(\0132\023.RegionActionResul" +
-      "t\022\021\n\tprocessed\030\002 \001(\010*\'\n\013Consistency\022\n\n\006S" +
-      "TRONG\020\000\022\014\n\010TIMELINE\020\0012\205\003\n\rClientService\022" +
-      " \n\003Get\022\013.GetRequest\032\014.GetResponse\022)\n\006Mut" +
-      "ate\022\016.MutateRequest\032\017.MutateResponse\022#\n\004" +
-      "Scan\022\014.ScanRequest\032\r.ScanResponse\022>\n\rBul" +
-      "kLoadHFile\022\025.BulkLoadHFileRequest\032\026.Bulk" +
-      "LoadHFileResponse\022F\n\013ExecService\022\032.Copro" +
-      "cessorServiceRequest\032\033.CoprocessorServic",
-      "eResponse\022R\n\027ExecRegionServerService\022\032.C" +
-      "oprocessorServiceRequest\032\033.CoprocessorSe" +
-      "rviceResponse\022&\n\005Multi\022\r.MultiRequest\032\016." +
-      "MultiResponseBB\n*org.apache.hadoop.hbase" +
-      ".protobuf.generatedB\014ClientProtosH\001\210\001\001\240\001" +
-      "\001"
+      "\004\022\037\n\027client_handles_partials\030\007 \001(\010\022!\n\031cl" +
+      "ient_handles_heartbeats\030\010 \001(\010\"\344\001\n\014ScanRe" +
+      "sponse\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n\nscan" +
+      "ner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022\013\n\003tt" +
+      "l\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Result\022\r\n\005sta" +
+      "le\030\006 \001(\010\022\037\n\027partial_flag_per_result\030\007 \003(" +
+      "\010\022\036\n\026more_results_in_region\030\010 \001(\010\022\031\n\021hea",
+      "rtbeat_message\030\t \001(\010\"\263\001\n\024BulkLoadHFileRe" +
+      "quest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022" +
+      "5\n\013family_path\030\002 \003(\0132 .BulkLoadHFileRequ" +
+      "est.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\032*" +
+      "\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002" +
+      "(\t\"\'\n\025BulkLoadHFileResponse\022\016\n\006loaded\030\001 " +
+      "\002(\010\"a\n\026CoprocessorServiceCall\022\013\n\003row\030\001 \002" +
+      "(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013method_name\030" +
+      "\003 \002(\t\022\017\n\007request\030\004 \002(\014\"9\n\030CoprocessorSer" +
+      "viceResult\022\035\n\005value\030\001 \001(\0132\016.NameBytesPai",
+      "r\"d\n\031CoprocessorServiceRequest\022 \n\006region" +
+      "\030\001 \002(\0132\020.RegionSpecifier\022%\n\004call\030\002 \002(\0132\027" +
+      ".CoprocessorServiceCall\"]\n\032CoprocessorSe" +
+      "rviceResponse\022 \n\006region\030\001 \002(\0132\020.RegionSp" +
+      "ecifier\022\035\n\005value\030\002 \002(\0132\016.NameBytesPair\"{" +
+      "\n\006Action\022\r\n\005index\030\001 \001(\r\022 \n\010mutation\030\002 \001(" +
+      "\0132\016.MutationProto\022\021\n\003get\030\003 \001(\0132\004.Get\022-\n\014" +
+      "service_call\030\004 \001(\0132\027.CoprocessorServiceC" +
+      "all\"Y\n\014RegionAction\022 \n\006region\030\001 \002(\0132\020.Re" +
+      "gionSpecifier\022\016\n\006atomic\030\002 \001(\010\022\027\n\006action\030",
+      "\003 \003(\0132\007.Action\"D\n\017RegionLoadStats\022\027\n\014mem" +
+      "storeLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001(" +
+      "\005:\0010\"\266\001\n\021ResultOrException\022\r\n\005index\030\001 \001(" +
+      "\r\022\027\n\006result\030\002 \001(\0132\007.Result\022!\n\texception\030" +
+      "\003 \001(\0132\016.NameBytesPair\0221\n\016service_result\030" +
+      "\004 \001(\0132\031.CoprocessorServiceResult\022#\n\tload" +
+      "Stats\030\005 \001(\0132\020.RegionLoadStats\"f\n\022RegionA" +
+      "ctionResult\022-\n\021resultOrException\030\001 \003(\0132\022" +
+      ".ResultOrException\022!\n\texception\030\002 \001(\0132\016." +
+      "NameBytesPair\"f\n\014MultiRequest\022#\n\014regionA",
+      "ction\030\001 \003(\0132\r.RegionAction\022\022\n\nnonceGroup" +
+      "\030\002 \001(\004\022\035\n\tcondition\030\003 \001(\0132\n.Condition\"S\n" +
+      "\rMultiResponse\022/\n\022regionActionResult\030\001 \003" +
+      "(\0132\023.RegionActionResult\022\021\n\tprocessed\030\002 \001" +
+      "(\010*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELIN" +
+      "E\020\0012\205\003\n\rClientService\022 \n\003Get\022\013.GetReques" +
+      "t\032\014.GetResponse\022)\n\006Mutate\022\016.MutateReques" +
+      "t\032\017.MutateResponse\022#\n\004Scan\022\014.ScanRequest" +
+      "\032\r.ScanResponse\022>\n\rBulkLoadHFile\022\025.BulkL" +
+      "oadHFileRequest\032\026.BulkLoadHFileResponse\022",
+      "F\n\013ExecService\022\032.CoprocessorServiceReque" +
+      "st\032\033.CoprocessorServiceResponse\022R\n\027ExecR" +
+      "egionServerService\022\032.CoprocessorServiceR" +
+      "equest\032\033.CoprocessorServiceResponse\022&\n\005M" +
+      "ulti\022\r.MultiRequest\032\016.MultiResponseBB\n*o" +
+      "rg.apache.hadoop.hbase.protobuf.generate" +
+      "dB\014ClientProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -32842,13 +33079,13 @@ public final class ClientProtos {
           internal_static_ScanRequest_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_ScanRequest_descriptor,
-              new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", });
+              new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", });
           internal_static_ScanResponse_descriptor =
             getDescriptor().getMessageTypes().get(13);
           internal_static_ScanResponse_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_ScanResponse_descriptor,
-              new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", "MoreResultsInRegion", });
+              new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", "MoreResultsInRegion", "HeartbeatMessage", });
           internal_static_BulkLoadHFileRequest_descriptor =
             getDescriptor().getMessageTypes().get(14);
           internal_static_BulkLoadHFileRequest_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-protocol/src/main/protobuf/Client.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto
index e0c370b..3a48cc8 100644
--- a/hbase-protocol/src/main/protobuf/Client.proto
+++ b/hbase-protocol/src/main/protobuf/Client.proto
@@ -275,6 +275,7 @@ message ScanRequest {
   optional bool close_scanner = 5;
   optional uint64 next_call_seq = 6;
   optional bool client_handles_partials = 7;
+  optional bool client_handles_heartbeats = 8;
 }
 
 /**
@@ -313,6 +314,12 @@ message ScanResponse {
   // reasons such as the size in bytes or quantity of results accumulated. This field
   // will true when more results exist in the current region.
   optional bool more_results_in_region = 8;
+
+  // This field is filled in if the server is sending back a heartbeat message.
+  // Heartbeat messages are sent back to the client to prevent the scanner from
+  // timing out. Seeing a heartbeat message communicates to the Client that the
+  // server would have continued to scan had the time limit not been reached.
+  optional bool heartbeat_message = 9;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
index 5809983..9d7bcc0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.NoLimitScannerContext;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.mortbay.log.Log;
 
@@ -72,8 +71,7 @@ public class ClientSideRegionScanner extends AbstractClientScanner {
   @Override
   public Result next() throws IOException {
     values.clear();
-
-    scanner.nextRaw(values, NoLimitScannerContext.getInstance());
+    scanner.nextRaw(values);
     if (values.isEmpty()) {
       //we are done
       return null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 96d5c77..c731923 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -5288,8 +5288,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
 
     @Override
-    public synchronized boolean next(List<Cell> outResults, ScannerContext scannerContext)
-        throws IOException {
+    public synchronized boolean next(List<Cell> outResults, ScannerContext scannerContext) throws IOException {
       if (this.filterClosed) {
         throw new UnknownScannerException("Scanner was closed (timed out?) " +
             "after we renewed it. Could be caused by a very slow scanner " +
@@ -5327,7 +5326,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         moreValues = nextInternal(tmpList, scannerContext);
         outResults.addAll(tmpList);
       }
-
+      
       // If the size limit was reached it means a partial Result is being returned. Returning a
       // partial Result means that we should not reset the filters; filters should only be reset in
       // between rows
@@ -5395,6 +5394,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           ScannerContext.NextState state =
               moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
           return scannerContext.setScannerState(state).hasMoreValues();
+        } else if (scannerContext.checkTimeLimit(limitScope)) {
+          ScannerContext.NextState state =
+              moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
+          return scannerContext.setScannerState(state).hasMoreValues();
         }
       } while (moreCellsInRow);
 
@@ -5443,6 +5446,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       // progress.
       int initialBatchProgress = scannerContext.getBatchProgress();
       long initialSizeProgress = scannerContext.getSizeProgress();
+      long initialTimeProgress = scannerContext.getTimeProgress();
 
       // The loop here is used only when at some point during the next we determine
       // that due to effects of filters or otherwise, we have an empty row in the result.
@@ -5454,7 +5458,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         // progress should be kept.
         if (scannerContext.getKeepProgress()) {
           // Progress should be kept. Reset to initial values seen at start of method invocation.
-          scannerContext.setProgress(initialBatchProgress, initialSizeProgress);
+          scannerContext
+              .setProgress(initialBatchProgress, initialSizeProgress, initialTimeProgress);
         } else {
           scannerContext.clearProgress();
         }
@@ -5502,6 +5507,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
                 + " formed. Changing scope of limits that may create partials");
           }
           scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
+          scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);
         }
 
         // Check if we were getting data from the joinedHeap and hit the limit.
@@ -5537,6 +5543,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             }
             return true;
           }
+
           Cell nextKv = this.storeHeap.peek();
           stopRow = nextKv == null ||
               isStopRow(nextKv.getRowArray(), nextKv.getRowOffset(), nextKv.getRowLength());
@@ -5550,12 +5557,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             ret = filter.filterRowCellsWithRet(results);
 
             // We don't know how the results have changed after being filtered. Must set progress
-            // according to contents of results now.
+            // according to contents of results now. However, a change in the results should not
+            // affect the time progress. Thus preserve whatever time progress has been made
+            long timeProgress = scannerContext.getTimeProgress();
             if (scannerContext.getKeepProgress()) {
-              scannerContext.setProgress(initialBatchProgress, initialSizeProgress);
+              scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
+                initialTimeProgress);
             } else {
               scannerContext.clearProgress();
             }
+            scannerContext.setTimeProgress(timeProgress);
             scannerContext.incrementBatchProgress(results.size());
             for (Cell cell : results) {
               scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell));
@@ -5580,14 +5591,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           // These values are not needed for filter to work, so we postpone their
           // fetch to (possibly) reduce amount of data loads from disk.
           if (this.joinedHeap != null) {
-            Cell nextJoinedKv = joinedHeap.peek();
-            // If joinedHeap is pointing to some other row, try to seek to a correct one.
-            boolean mayHaveData = (nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv,
-                currentRow, offset, length))
-                || (this.joinedHeap.requestSeek(
-                    KeyValueUtil.createFirstOnRow(currentRow, offset, length), true, true)
-                    && joinedHeap.peek() != null && CellUtil.matchingRow(joinedHeap.peek(),
-                    currentRow, offset, length));
+            boolean mayHaveData = joinedHeapMayHaveData(currentRow, offset, length);
             if (mayHaveData) {
               joinedContinuationRow = current;
               populateFromJoinedHeap(results, scannerContext);
@@ -5631,6 +5635,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
 
     /**
+     * @param currentRow
+     * @param offset
+     * @param length
+     * @return true when the joined heap may have data for the current row
+     * @throws IOException
+     */
+    private boolean joinedHeapMayHaveData(byte[] currentRow, int offset, short length)
+        throws IOException {
+      Cell nextJoinedKv = joinedHeap.peek();
+      boolean matchCurrentRow =
+          nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv, currentRow, offset, length);
+      boolean matchAfterSeek = false;
+
+      // If the next value in the joined heap does not match the current row, try to seek to the
+      // correct row
+      if (!matchCurrentRow) {
+        Cell firstOnCurrentRow = KeyValueUtil.createFirstOnRow(currentRow, offset, length);
+        boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true);
+        matchAfterSeek =
+            seekSuccessful && joinedHeap.peek() != null
+                && CellUtil.matchingRow(joinedHeap.peek(), currentRow, offset, length);
+      }
+
+      return matchCurrentRow || matchAfterSeek;
+    }
+
+    /**
      * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines
      * both filterRow & filterRow(List<KeyValue> kvs) functions. While 0.94 code or older, it may
      * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns

http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
index 761267f..6433453 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
@@ -144,6 +144,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
     InternalScanner currentAsInternal = (InternalScanner)this.current;
     boolean moreCells = currentAsInternal.next(result, scannerContext);
     Cell pee = this.current.peek();
+
     /*
      * By definition, any InternalScanner must return false only when it has no
      * further rows to be fetched. So, we can close a scanner if it returns
@@ -151,6 +152,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
      * more efficient to close scanners which are not needed than keep them in
      * the heap. This is also required for certain optimizations.
      */
+
     if (pee == null || !moreCells) {
       this.current.close();
     } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java
index 1484e80..66ed6c0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java
@@ -68,7 +68,22 @@ public class NoLimitScannerContext extends ScannerContext {
   }
 
   @Override
-  void setProgress(int batchProgress, long sizeProgress) {
+  void setTimeProgress(long timeProgress) {
+    // Do nothing. NoLimitScannerContext instances are immutable post-construction
+  }
+
+  @Override
+  void updateTimeProgress() {
+    // Do nothing. NoLimitScannerContext instances are immutable post-construction
+  }
+
+  @Override
+  void setProgress(int batchProgress, long sizeProgress, long timeProgress) {
+    // Do nothing. NoLimitScannerContext instances are immutable post-construction
+  }
+
+  @Override
+  void clearProgress() {
     // Do nothing. NoLimitScannerContext instances are immutable post-construction
   }
 
@@ -78,6 +93,11 @@ public class NoLimitScannerContext extends ScannerContext {
   }
 
   @Override
+  void setTimeLimitScope(LimitScope scope) {
+    // Do nothing. NoLimitScannerContext instances are immutable post-construction
+  }
+
+  @Override
   NextState setScannerState(NextState state) {
     // Do nothing. NoLimitScannerContext instances are immutable post-construction
     return state;
@@ -96,6 +116,12 @@ public class NoLimitScannerContext extends ScannerContext {
   }
 
   @Override
+  boolean checkTimeLimit(LimitScope checkerScope) {
+    // No limits can be specified, thus return false to indicate no limit has been reached.
+    return false;
+  }
+
+  @Override
   boolean checkAnyLimitReached(LimitScope checkerScope) {
     return false;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 38956cc..f322862 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -191,6 +191,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
     "hbase.region.server.rpc.scheduler.factory.class";
 
+  /**
+   * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This
+   * configuration exists to prevent the scenario where a time limit is specified to be so
+   * restrictive that the time limit is reached immediately (before any cells are scanned).
+   */
+  private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA =
+      "hbase.region.server.rpc.minimum.scan.time.limit.delta";
+  /**
+   * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA}
+   */
+  private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
+
   // Request counter. (Includes requests that are not serviced by regions.)
   final Counter requestCount = new Counter();
   // Server to handle client requests.
@@ -213,6 +225,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   private final int scannerLeaseTimeoutPeriod;
 
   /**
+   * The RPC timeout period (milliseconds)
+   */
+  private final int rpcTimeout;
+
+  /**
+   * The minimum allowable delta to use for the scan limit
+   */
+  private final long minimumScanTimeLimitDelta;
+
+  /**
    * Holder class which holds the RegionScanner and nextCallSeq together.
    */
   private static class RegionScannerHolder {
@@ -829,6 +851,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     maxScannerResultSize = rs.conf.getLong(
       HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
       HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
+    rpcTimeout = rs.conf.getInt(
+      HConstants.HBASE_RPC_TIMEOUT_KEY,
+      HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+    minimumScanTimeLimitDelta = rs.conf.getLong(
+      REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
+      DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
 
     // Set our address, however we need the final port that was given to rpcServer
     isa = new InetSocketAddress(initialIsa.getHostName(), rpcServer.getListenerAddress().getPort());
@@ -2241,6 +2269,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
                 boolean stale = (region.getRegionInfo().getReplicaId() != 0);
                 boolean clientHandlesPartials =
                     request.hasClientHandlesPartials() && request.getClientHandlesPartials();
+                boolean clientHandlesHeartbeats =
+                    request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
 
                 // On the server side we must ensure that the correct ordering of partial results is
                 // returned to the client to allow them to properly reconstruct the partial results.
@@ -2252,23 +2282,52 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
                     clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan;
                 boolean moreRows = false;
 
+                // Heartbeat messages occur when the processing of the ScanRequest is exceeds a
+                // certain time threshold on the server. When the time threshold is exceeded, the
+                // server stops the scan and sends back whatever Results it has accumulated within
+                // that time period (may be empty). Since heartbeat messages have the potential to
+                // create partial Results (in the event that the timeout occurs in the middle of a
+                // row), we must only generate heartbeat messages when the client can handle both
+                // heartbeats AND partials
+                boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
+
+                // Default value of timeLimit is negative to indicate no timeLimit should be
+                // enforced.
+                long timeLimit = -1;
+
+                // Set the time limit to be half of the more restrictive timeout value (one of the
+                // timeout values must be positive). In the event that both values are positive, the
+                // more restrictive of the two is used to calculate the limit.
+                if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) {
+                  long timeLimitDelta;
+                  if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) {
+                    timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout);
+                  } else {
+                    timeLimitDelta =
+                        scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
+                  }
+                  // Use half of whichever timeout value was more restrictive... But don't allow
+                  // the time limit to be less than the allowable minimum (could cause an
+                  // immediatate timeout before scanning any data).
+                  timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
+                  timeLimit = System.currentTimeMillis() + timeLimitDelta;
+                }
+
                 final LimitScope sizeScope =
                     allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
+                final LimitScope timeScope =
+                    allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
 
                 // Configure with limits for this RPC. Set keep progress true since size progress
                 // towards size limit should be kept between calls to nextRaw
                 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
                 contextBuilder.setSizeLimit(sizeScope, maxResultSize);
                 contextBuilder.setBatchLimit(scanner.getBatch());
+                contextBuilder.setTimeLimit(timeScope, timeLimit);
                 ScannerContext scannerContext = contextBuilder.build();
 
+                boolean limitReached = false;
                 while (i < rows) {
-                  // Stop collecting results if we have exceeded maxResultSize
-                  if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS)) {
-                    builder.setMoreResultsInRegion(true);
-                    break;
-                  }
-
                   // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
                   // batch limit is a limit on the number of cells per Result. Thus, if progress is
                   // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
@@ -2287,14 +2346,31 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
                     results.add(Result.create(values, null, stale, partial));
                     i++;
                   }
-                  if (!moreRows) {
+
+                  boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
+                  boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
+                  boolean rowLimitReached = i >= rows;
+                  limitReached = sizeLimitReached || timeLimitReached || rowLimitReached;
+
+                  if (limitReached || !moreRows) {
+                    if (LOG.isTraceEnabled()) {
+                      LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: "
+                          + moreRows + " scannerContext: " + scannerContext);
+                    }
+                    // We only want to mark a ScanResponse as a heartbeat message in the event that
+                    // there are more values to be read server side. If there aren't more values,
+                    // marking it as a heartbeat is wasteful because the client will need to issue
+                    // another ScanRequest only to realize that they already have all the values
+                    if (moreRows) {
+                      // Heartbeat messages occur when the time limit has been reached.
+                      builder.setHeartbeatMessage(timeLimitReached);
+                    }
                     break;
                   }
                   values.clear();
                 }
 
-                if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS) || i >= rows || 
-                    moreRows) {
+                if (limitReached || moreRows) {
                   // We stopped prematurely
                   builder.setMoreResultsInRegion(true);
                 } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
index 6e487ca..7c8ff7d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
@@ -101,7 +101,7 @@ public class ScannerContext {
     if (limitsToCopy != null) this.limits.copy(limitsToCopy);
 
     // Progress fields are initialized to 0
-    progress = new LimitFields(0, LimitFields.DEFAULT_SCOPE, 0);
+    progress = new LimitFields(0, LimitFields.DEFAULT_SCOPE, 0, LimitFields.DEFAULT_SCOPE, 0);
 
     this.keepProgress = keepProgress;
     this.scannerState = DEFAULT_STATE;
@@ -137,6 +137,13 @@ public class ScannerContext {
     progress.setSize(currentSize + size);
   }
 
+  /**
+   * Update the time progress with {@link System#currentTimeMillis()}
+   */
+  void updateTimeProgress() {
+    progress.setTime(System.currentTimeMillis());
+  }
+
   int getBatchProgress() {
     return progress.getBatch();
   }
@@ -145,9 +152,14 @@ public class ScannerContext {
     return progress.getSize();
   }
 
-  void setProgress(int batchProgress, long sizeProgress) {
+  long getTimeProgress() {
+    return progress.getTime();
+  }
+
+  void setProgress(int batchProgress, long sizeProgress, long timeProgress) {
     setBatchProgress(batchProgress);
     setSizeProgress(sizeProgress);
+    setTimeProgress(timeProgress);
   }
 
   void setSizeProgress(long sizeProgress) {
@@ -158,12 +170,16 @@ public class ScannerContext {
     progress.setBatch(batchProgress);
   }
 
+  void setTimeProgress(long timeProgress) {
+    progress.setTime(timeProgress);
+  }
+
   /**
    * Clear away any progress that has been made so far. All progress fields are reset to initial
    * values
    */
   void clearProgress() {
-    progress.setFields(0, LimitFields.DEFAULT_SCOPE, 0);
+    progress.setFields(0, LimitFields.DEFAULT_SCOPE, 0, LimitFields.DEFAULT_SCOPE, 0);
   }
 
   /**
@@ -172,7 +188,7 @@ public class ScannerContext {
    * allows the {@link NoLimitScannerContext} to cleanly override this setter and simply return the
    * new state, thus preserving the immutability of {@link NoLimitScannerContext}
    * @param state
-   * @return The state that
+   * @return The state that was passed in.
    */
   NextState setScannerState(NextState state) {
     if (!NextState.isValidState(state)) {
@@ -188,7 +204,8 @@ public class ScannerContext {
    *         reached in the middle of a row.
    */
   boolean partialResultFormed() {
-    return scannerState == NextState.SIZE_LIMIT_REACHED_MID_ROW;
+    return scannerState == NextState.SIZE_LIMIT_REACHED_MID_ROW
+        || scannerState == NextState.TIME_LIMIT_REACHED_MID_ROW;
   }
 
   /**
@@ -209,10 +226,18 @@ public class ScannerContext {
 
   /**
    * @param checkerScope
+   * @return true if the time limit can be enforced in the checker's scope
+   */
+  boolean hasTimeLimit(LimitScope checkerScope) {
+    return limits.canEnforceTimeLimitFromScope(checkerScope) && limits.getTime() > 0;
+  }
+
+  /**
+   * @param checkerScope
    * @return true if any limit can be enforced within the checker's scope
    */
   boolean hasAnyLimit(LimitScope checkerScope) {
-    return hasBatchLimit(checkerScope) || hasSizeLimit(checkerScope);
+    return hasBatchLimit(checkerScope) || hasSizeLimit(checkerScope) || hasTimeLimit(checkerScope);
   }
 
   /**
@@ -222,6 +247,13 @@ public class ScannerContext {
     limits.setSizeScope(scope);
   }
 
+  /**
+   * @param scope The scope in which the time limit will be enforced
+   */
+  void setTimeLimitScope(LimitScope scope) {
+    limits.setTimeScope(scope);
+  }
+
   int getBatchLimit() {
     return limits.getBatch();
   }
@@ -230,6 +262,10 @@ public class ScannerContext {
     return limits.getSize();
   }
 
+  long getTimeLimit() {
+    return limits.getTime();
+  }
+
   /**
    * @param checkerScope The scope that the limit is being checked from
    * @return true when the limit is enforceable from the checker's scope and it has been reached
@@ -247,11 +283,21 @@ public class ScannerContext {
   }
 
   /**
+   * @param checkerScope The scope that the limit is being checked from. The time limit is always
+   *          checked against {@link System#currentTimeMillis()}
+   * @return true when the limit is enforceable from the checker's scope and it has been reached
+   */
+  boolean checkTimeLimit(LimitScope checkerScope) {
+    return hasTimeLimit(checkerScope) && progress.getTime() >= limits.getTime();
+  }
+
+  /**
    * @param checkerScope The scope that the limits are being checked from
    * @return true when some limit is enforceable from the checker's scope and it has been reached
    */
   boolean checkAnyLimitReached(LimitScope checkerScope) {
-    return checkSizeLimit(checkerScope) || checkBatchLimit(checkerScope);
+    return checkSizeLimit(checkerScope) || checkBatchLimit(checkerScope)
+        || checkTimeLimit(checkerScope);
   }
 
   @Override
@@ -305,6 +351,12 @@ public class ScannerContext {
       return this;
     }
 
+    public Builder setTimeLimit(LimitScope timeScope, long timeLimit) {
+      limits.setTime(timeLimit);
+      limits.setTimeScope(timeScope);
+      return this;
+    }
+
     public Builder setBatchLimit(int batchLimit) {
       limits.setBatch(batchLimit);
       return this;
@@ -328,6 +380,13 @@ public class ScannerContext {
      * of a row and thus a partial results was formed
      */
     SIZE_LIMIT_REACHED_MID_ROW(true, true),
+    TIME_LIMIT_REACHED(true, true),
+
+    /**
+     * Special case of time limit reached to indicate that the time limit was reached in the middle
+     * of a row and thus a partial results was formed
+     */
+    TIME_LIMIT_REACHED_MID_ROW(true, true),
     BATCH_LIMIT_REACHED(true, true);
 
     private boolean moreValues;
@@ -419,6 +478,7 @@ public class ScannerContext {
      */
     private static int DEFAULT_BATCH = -1;
     private static long DEFAULT_SIZE = -1L;
+    private static long DEFAULT_TIME = -1L;
 
     /**
      * Default scope that is assigned to a limit if a scope is not specified.
@@ -432,19 +492,23 @@ public class ScannerContext {
     LimitScope sizeScope = DEFAULT_SCOPE;
     long size = DEFAULT_SIZE;
 
+    LimitScope timeScope = DEFAULT_SCOPE;
+    long time = DEFAULT_TIME;
+
     /**
      * Fields keep their default values.
      */
     LimitFields() {
     }
 
-    LimitFields(int batch, LimitScope sizeScope, long size) {
-      setFields(batch, sizeScope, size);
+    LimitFields(int batch, LimitScope sizeScope, long size, LimitScope timeScope, long time) {
+      setFields(batch, sizeScope, size, timeScope, time);
     }
 
     void copy(LimitFields limitsToCopy) {
       if (limitsToCopy != null) {
-        setFields(limitsToCopy.getBatch(), limitsToCopy.getSizeScope(), limitsToCopy.getSize());
+        setFields(limitsToCopy.getBatch(), limitsToCopy.getSizeScope(), limitsToCopy.getSize(),
+          limitsToCopy.getTimeScope(), limitsToCopy.getTime());
       }
     }
 
@@ -454,10 +518,12 @@ public class ScannerContext {
      * @param sizeScope
      * @param size
      */
-    void setFields(int batch, LimitScope sizeScope, long size) {
+    void setFields(int batch, LimitScope sizeScope, long size, LimitScope timeScope, long time) {
       setBatch(batch);
       setSizeScope(sizeScope);
       setSize(size);
+      setTimeScope(timeScope);
+      setTime(time);
     }
 
     int getBatch() {
@@ -506,6 +572,36 @@ public class ScannerContext {
       return this.sizeScope.canEnforceLimitFromScope(checkerScope);
     }
 
+    long getTime() {
+      return this.time;
+    }
+
+    void setTime(long time) {
+      this.time = time;
+    }
+
+    /**
+     * @return {@link LimitScope} indicating scope in which the time limit is enforced
+     */
+    LimitScope getTimeScope() {
+      return this.timeScope;
+    }
+
+    /**
+     * Change the scope in which the time limit is enforced
+     */
+    void setTimeScope(LimitScope scope) {
+      this.timeScope = scope;
+    }
+
+    /**
+     * @param checkerScope
+     * @return true when the limit can be enforced from the scope of the checker
+     */
+    boolean canEnforceTimeLimitFromScope(LimitScope checkerScope) {
+      return this.sizeScope.canEnforceLimitFromScope(checkerScope);
+    }
+
     @Override
     public String toString() {
       StringBuilder sb = new StringBuilder();
@@ -520,6 +616,12 @@ public class ScannerContext {
       sb.append(", sizeScope:");
       sb.append(sizeScope);
 
+      sb.append(", time:");
+      sb.append(time);
+
+      sb.append(", timeScope:");
+      sb.append(timeScope);
+
       sb.append("}");
       return sb.toString();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 665ed46..f7e06ef 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -83,6 +83,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   protected final long now;
   protected final int minVersions;
   protected final long maxRowSize;
+  protected final long cellsPerHeartbeatCheck;
 
   /**
    * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
@@ -100,6 +101,19 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   protected static boolean lazySeekEnabledGlobally =
       LAZY_SEEK_ENABLED_BY_DEFAULT;
 
+  /**
+   * The number of cells scanned in between timeout checks. Specifying a larger value means that
+   * timeout checks will occur less frequently. Specifying a small value will lead to more frequent
+   * timeout checks.
+   */
+  public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =
+      "hbase.cells.scanned.per.heartbeat.check";
+
+  /**
+   * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}.
+   */
+  public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
+
   // if heap == null and lastTop != null, you need to reseek given the key below
   protected Cell lastTop = null;
 
@@ -137,9 +151,17 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
       this.maxRowSize =
           conf.getLong(HConstants.TABLE_MAX_ROWSIZE_KEY, HConstants.TABLE_MAX_ROWSIZE_DEFAULT);
       this.scanUsePread = conf.getBoolean("hbase.storescanner.use.pread", scan.isSmall());
+
+      long tmpCellsPerTimeoutCheck =
+          conf.getLong(HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK,
+            DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK);
+      this.cellsPerHeartbeatCheck =
+          tmpCellsPerTimeoutCheck > 0 ? tmpCellsPerTimeoutCheck
+              : DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
     } else {
       this.maxRowSize = HConstants.TABLE_MAX_ROWSIZE_DEFAULT;
       this.scanUsePread = scan.isSmall();
+      this.cellsPerHeartbeatCheck = DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
     }
 
     // We look up row-column Bloom filters for multi-column queries as part of
@@ -458,6 +480,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   @Override
   public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
     lock.lock();
+
     try {
     if (scannerContext == null) {
       throw new IllegalArgumentException("Scanner context cannot be null");
@@ -507,6 +530,14 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     long totalBytesRead = 0;
 
     LOOP: while((cell = this.heap.peek()) != null) {
+      // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
+      if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
+        scannerContext.updateTimeProgress();
+        if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
+          return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
+        }
+      }
+
       if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
       checkScanOrder(prevCell, cell, comparator);
       prevCell = cell;

http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
index eef955e..3794e59 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
@@ -217,7 +217,8 @@ public class TestPartialResultsFromClientSide {
       count++;
     }
 
-    assertTrue(scanner2.next() == null);
+    r2 = scanner2.next();
+    assertTrue("r2: " + r2 + " Should be null", r2 == null);
 
     scanner1.close();
     scanner2.close();

http://git-wip-us.apache.org/repos/asf/hbase/blob/abe3796a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
index a8b5456..82fa3c2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
@@ -144,7 +144,6 @@ public class TestCoprocessorInterface {
     public int getBatch() {
       return delegate.getBatch();
     }
-
   }
 
   public static class CoprocessorImpl extends BaseRegionObserver {