You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2015/06/21 16:34:33 UTC

hbase git commit: HBASE-13926 Close the scanner only after Call#setResponse.

Repository: hbase
Updated Branches:
  refs/heads/master 04c25e0f3 -> e4d8fab10


HBASE-13926 Close the scanner only after Call#setResponse.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e4d8fab1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e4d8fab1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e4d8fab1

Branch: refs/heads/master
Commit: e4d8fab104d6c4edc8721ee14238fcc59bd203d0
Parents: 04c25e0
Author: anoopsjohn <an...@gmail.com>
Authored: Sun Jun 21 20:04:13 2015 +0530
Committer: anoopsjohn <an...@gmail.com>
Committed: Sun Jun 21 20:04:13 2015 +0530

----------------------------------------------------------------------
 .../hadoop/hbase/io/HalfStoreFileReader.java    |   5 +
 .../hadoop/hbase/io/hfile/HFileReaderImpl.java  |   5 +
 .../hadoop/hbase/io/hfile/HFileScanner.java     |   3 +-
 .../apache/hadoop/hbase/ipc/RpcCallContext.java |   8 ++
 .../apache/hadoop/hbase/ipc/RpcCallback.java    |  36 ++++++
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  |  16 +++
 .../hadoop/hbase/regionserver/HRegion.java      |  19 ++-
 .../hadoop/hbase/regionserver/KeyValueHeap.java |  16 +++
 .../hbase/regionserver/KeyValueScanner.java     |   2 +-
 .../hadoop/hbase/regionserver/Leases.java       |   7 +-
 .../regionserver/NonLazyKeyValueScanner.java    |   5 +
 .../hbase/regionserver/RSRpcServices.java       | 117 +++++++++++++++----
 .../hbase/regionserver/RegionScanner.java       |   2 +-
 .../hadoop/hbase/regionserver/Shipper.java      |  37 ++++++
 .../hbase/regionserver/StoreFileScanner.java    |   5 +
 .../hadoop/hbase/regionserver/StoreScanner.java |  16 +++
 .../coprocessor/TestCoprocessorInterface.java   |   5 +
 17 files changed, 277 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e4d8fab1/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
index 7a4a333..5f17c3b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
@@ -293,6 +293,11 @@ public class HalfStoreFileReader extends StoreFile.Reader {
       public void close() {
         this.delegate.close();
       }
+
+      @Override
+      public void shipped() throws IOException {
+        this.delegate.shipped();
+      }
     };
   }
   

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4d8fab1/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index c6655c1..4bd85a4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -1083,6 +1083,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
     public void close() {
       // HBASE-12295 will add code here.
     }
+
+    @Override
+    public void shipped() throws IOException {
+      // HBASE-12295 will add code here.
+    }
   }
 
   public Path getPath() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4d8fab1/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
index 4d9990e..6120e71 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.Shipper;
 import org.apache.hadoop.hbase.Cell;
 
 /**
@@ -37,7 +38,7 @@ import org.apache.hadoop.hbase.Cell;
  * getValue.
  */
 @InterfaceAudience.Private
-public interface HFileScanner {
+public interface HFileScanner extends Shipper {
   /**
    * SeekTo or just before the passed <code>cell</code>.  Examine the return
    * code to figure whether we found the cell or not.

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4d8fab1/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
index bb63e01..268b34f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
@@ -63,4 +63,12 @@ public interface RpcCallContext extends Delayable {
    * @return the client version info, or null if the information is not present
    */
   VersionInfo getClientVersionInfo();
+
+  /**
+   * Sets a callback which has to be executed at the end of this RPC call. Such a callback is an
+   * optional one for any Rpc call.
+   *
+   * @param callback
+   */
+  void setCallBack(RpcCallback callback);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4d8fab1/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallback.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallback.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallback.java
new file mode 100644
index 0000000..90b7a87
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallback.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Denotes a callback action that has to be executed at the end of an Rpc Call.
+ *
+ * @see RpcCallContext#setCallBack(RpcCallback)
+ */
+@InterfaceAudience.Private
+public interface RpcCallback {
+
+  /**
+   * Called at the end of an Rpc Call {@link RpcCallContext}
+   */
+  void run() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4d8fab1/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 3557768..cafd25d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -306,6 +306,7 @@ public class RpcServer implements RpcServerInterface {
 
     private User user;
     private InetAddress remoteAddress;
+    private RpcCallback callback;
 
     Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
          Message param, CellScanner cellScanner, Connection connection, Responder responder,
@@ -431,6 +432,16 @@ public class RpcServer implements RpcServerInterface {
         LOG.warn("Exception while creating response " + e);
       }
       this.response = bc;
+      // Once a response message is created and set to this.response, this Call can be treated as
+      // done. The Responder thread will do the n/w write of this message back to client.
+      if (this.callback != null) {
+        try {
+          this.callback.run();
+        } catch (Exception e) {
+          // Don't allow any exception here to kill this handler thread.
+          LOG.warn("Exception while running the Rpc Callback.", e);
+        }
+      }
     }
 
     private BufferChain wrapWithSasl(BufferChain bc)
@@ -553,6 +564,11 @@ public class RpcServer implements RpcServerInterface {
     public VersionInfo getClientVersionInfo() {
       return connection.getVersionInfo();
     }
+
+    @Override
+    public void setCallBack(RpcCallback callback) {
+      this.callback = callback;
+    }
   }
 
   /** Listens on the socket. Creates jobs for the handler threads*/

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4d8fab1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index a3e862d..7a69e32 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -5225,7 +5225,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   /**
    * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).
    */
-  class RegionScannerImpl implements RegionScanner {
+  class RegionScannerImpl implements RegionScanner, org.apache.hadoop.hbase.ipc.RpcCallback {
     // Package local for testability
     KeyValueHeap storeHeap = null;
     /** Heap of key-values that are not essential for the provided filters and are thus read
@@ -5830,6 +5830,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
       throw new UnsupportedOperationException("not able to abort RS after: " + msg);
     }
+
+    @Override
+    public void shipped() throws IOException {
+      if (storeHeap != null) {
+        storeHeap.shipped();
+      }
+      if (joinedHeap != null) {
+        joinedHeap.shipped();
+      }
+    }
+
+    @Override
+    public void run() throws IOException {
+      // This is the RPC callback method executed. We do the close in of the scanner in this
+      // callback
+      this.close();
+    }
   }
 
   // Utility methods

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4d8fab1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
index a12e7c3..2b9d0f5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
@@ -406,4 +406,20 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
     // here we return the next index key from the top scanner
     return current == null ? null : current.getNextIndexedKey();
   }
+
+  @Override
+  public void shipped() throws IOException {
+    for (KeyValueScanner scanner : this.scannersForDelayedClose) {
+      scanner.close(); // There wont be further fetch of Cells from these scanners. Just close.
+    }
+    this.scannersForDelayedClose.clear();
+    if (this.current != null) {
+      this.current.shipped();
+    }
+    if (this.heap != null) {
+      for (KeyValueScanner scanner : this.heap) {
+        scanner.shipped();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4d8fab1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
index 76a9d0f..9a62b8f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.client.Scan;
  * Scanner that returns the next KeyValue.
  */
 @InterfaceAudience.Private
-public interface KeyValueScanner {
+public interface KeyValueScanner extends Shipper {
   /**
    * Look at the next Cell in this scanner, but do not iterate scanner.
    * @return the next Cell

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4d8fab1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java
index 83b9fb1..1373e27 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java
@@ -155,11 +155,14 @@ public class Leases extends HasThread {
    * @param leaseName name of the lease
    * @param leaseTimeoutPeriod length of the lease in milliseconds
    * @param listener listener that will process lease expirations
+   * @return The lease created.
    * @throws LeaseStillHeldException
    */
-  public void createLease(String leaseName, int leaseTimeoutPeriod, final LeaseListener listener)
+  public Lease createLease(String leaseName, int leaseTimeoutPeriod, final LeaseListener listener)
       throws LeaseStillHeldException {
-    addLease(new Lease(leaseName, leaseTimeoutPeriod, listener));
+    Lease lease = new Lease(leaseName, leaseTimeoutPeriod, listener);
+    addLease(lease);
+    return lease;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4d8fab1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java
index 957f417..9a9036b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java
@@ -71,4 +71,9 @@ public abstract class NonLazyKeyValueScanner implements KeyValueScanner {
   public Cell getNextIndexedKey() {
     return null;
   }
+
+  @Override
+  public void shipped() throws IOException {
+    // do nothing
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4d8fab1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 5fd285a..6dc9f4e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.PriorityFunction;
 import org.apache.hadoop.hbase.ipc.QosPriority;
 import org.apache.hadoop.hbase.ipc.RpcCallContext;
+import org.apache.hadoop.hbase.ipc.RpcCallback;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
@@ -154,6 +155,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
 import org.apache.hadoop.hbase.quotas.OperationQuota;
 import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
+import org.apache.hadoop.hbase.regionserver.Leases.Lease;
 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
 import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
 import org.apache.hadoop.hbase.regionserver.Region.Operation;
@@ -239,16 +241,64 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   private final long minimumScanTimeLimitDelta;
 
   /**
-   * Holder class which holds the RegionScanner and nextCallSeq together.
+   * An Rpc callback for closing a RegionScanner.
+   */
+  private static class RegionScannerCloseCallBack implements RpcCallback {
+
+    private final RegionScanner scanner;
+
+    public RegionScannerCloseCallBack(RegionScanner scanner){
+      this.scanner = scanner;
+    }
+
+    @Override
+    public void run() throws IOException {
+      this.scanner.close();
+    }
+  }
+
+  /**
+   * An Rpc callback for doing shipped() call on a RegionScanner.
+   */
+  private class RegionScannerShippedCallBack implements RpcCallback {
+
+    private final String scannerName;
+    private final RegionScanner scanner;
+    private final Lease lease;
+
+    public RegionScannerShippedCallBack(String scannerName, RegionScanner scanner, Lease lease) {
+      this.scannerName = scannerName;
+      this.scanner = scanner;
+      this.lease = lease;
+    }
+
+    @Override
+    public void run() throws IOException {
+      this.scanner.shipped();
+      // We're done. On way out re-add the above removed lease. The lease was temp removed for this
+      // Rpc call and we are at end of the call now. Time to add it back.
+      if (scanners.containsKey(scannerName)) {
+        if (lease != null) regionServer.leases.addLease(lease);
+      }
+    }
+  }
+
+  /**
+   * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together.
    */
   private static class RegionScannerHolder {
     private AtomicLong nextCallSeq = new AtomicLong(0);
     private RegionScanner s;
     private Region r;
+    final RpcCallback closeCallBack;
+    final RpcCallback shippedCallback;
 
-    public RegionScannerHolder(RegionScanner s, Region r) {
+    public RegionScannerHolder(RegionScanner s, Region r, RpcCallback closeCallBack,
+        RpcCallback shippedCallback) {
       this.s = s;
       this.r = r;
+      this.closeCallBack = closeCallBack;
+      this.shippedCallback = shippedCallback;
     }
 
     private long getNextCallSeq() {
@@ -364,6 +414,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     return context != null && context.isClientCellBlockSupport();
   }
 
+  private boolean isClientCellBlockSupport(RpcCallContext context) {
+    return context != null && context.isClientCellBlockSupport();
+  }
+
   private void addResult(final MutateResponse.Builder builder,
       final Result result, final PayloadCarryingRpcController rpcc) {
     if (result == null) return;
@@ -377,10 +431,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   }
 
   private void addResults(final ScanResponse.Builder builder, final List<Result> results,
-      final RpcController controller, boolean isDefaultRegion) {
+      final RpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) {
     builder.setStale(!isDefaultRegion);
     if (results == null || results.isEmpty()) return;
-    if (isClientCellBlockSupport()) {
+    if (clientCellBlockSupported) {
       for (Result res : results) {
         builder.addCellsPerResult(res.size());
         builder.addPartialFlagPerResult(res.isPartial());
@@ -923,17 +977,21 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     return 0L;
   }
 
-  long addScanner(RegionScanner s, Region r) throws LeaseStillHeldException {
-    long scannerId = this.scannerIdGen.incrementAndGet();
-    String scannerName = String.valueOf(scannerId);
-
-    RegionScannerHolder existing =
-      scanners.putIfAbsent(scannerName, new RegionScannerHolder(s, r));
-    assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!";
-
-    regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
+  RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r)
+      throws LeaseStillHeldException {
+    Lease lease = regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
         new ScannerListener(scannerName));
-    return scannerId;
+    RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, s, lease);
+    RpcCallback closeCallback;
+    if (s instanceof RpcCallback) {
+      closeCallback = (RpcCallback) s;
+    } else {
+      closeCallback = new RegionScannerCloseCallBack(s);
+    }
+    RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback);
+    RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
+    assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!";
+    return rsh;
   }
 
   /**
@@ -2229,11 +2287,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         if (region.getCoprocessorHost() != null) {
           scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
         }
-        scannerId = addScanner(scanner, region);
+        scannerId = this.scannerIdGen.incrementAndGet();
         scannerName = String.valueOf(scannerId);
+        rsh = addScanner(scannerName, scanner, region);
         ttl = this.scannerLeaseTimeoutPeriod;
       }
 
+      RpcCallContext context = RpcServer.getCurrentCall();
+
       quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
       long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
 
@@ -2451,7 +2512,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
             moreResults = false;
             results = null;
           } else {
-            addResults(builder, results, controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()));
+            addResults(builder, results, controller,
+                RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()),
+                isClientCellBlockSupport(context));
           }
         } catch (IOException e) {
           // if we have an exception on scanner next and we are using the callSeq
@@ -2462,11 +2525,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           }
           throw e;
         } finally {
-          // We're done. On way out re-add the above removed lease.
+          if (context != null) {
+            context.setCallBack(rsh.shippedCallback);
+          }
           // Adding resets expiration time on lease.
           if (scanners.containsKey(scannerName)) {
-            if (lease != null) regionServer.leases.addLease(lease);
             ttl = this.scannerLeaseTimeoutPeriod;
+            // When context != null, adding back the lease will be done in callback set above.
+            if (context == null) {
+              if (lease != null) regionServer.leases.addLease(lease);
+            }
           }
         }
       }
@@ -2481,9 +2549,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         }
         rsh = scanners.remove(scannerName);
         if (rsh != null) {
-          scanner = rsh.s;
-          scanner.close();
-          regionServer.leases.cancelLease(scannerName);
+          if (context != null) {
+            context.setCallBack(rsh.closeCallBack);
+          } else {
+            rsh.s.close();
+          }
+          try {
+            regionServer.leases.cancelLease(scannerName);
+          } catch (LeaseException le) {
+            // No problem, ignore
+          }
           if (region != null && region.getCoprocessorHost() != null) {
             region.getCoprocessorHost().postScannerClose(scanner);
           }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4d8fab1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
index 1bc6546..9e7ff0f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
 @InterfaceStability.Evolving
-public interface RegionScanner extends InternalScanner {
+public interface RegionScanner extends InternalScanner, Shipper {
   /**
    * @return The RegionInfo for this scanner.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4d8fab1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Shipper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Shipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Shipper.java
new file mode 100644
index 0000000..fb66f51
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Shipper.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * This interface denotes a scanner as one which can ship cells. Scan operation do many RPC requests
+ * to server and fetch N rows/RPC. These are then shipped to client. At the end of every such batch
+ * {@link #shipped()} will get called.
+ */
+@InterfaceAudience.Private
+public interface Shipper {
+
+  /**
+   * Called after a batch of rows scanned and set to be returned to client. Any in between cleanup
+   * can be done here.
+   */
+  void shipped() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4d8fab1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index e7a5af4..2f64607 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -488,4 +488,9 @@ public class StoreFileScanner implements KeyValueScanner {
   public Cell getNextIndexedKey() {
     return hfs.getNextIndexedKey();
   }
+
+  @Override
+  public void shipped() throws IOException {
+    this.hfs.shipped();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4d8fab1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index d63ccca..d60087b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -901,5 +901,21 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   public Cell getNextIndexedKey() {
     return this.heap.getNextIndexedKey();
   }
+
+  @Override
+  public void shipped() throws IOException {
+    lock.lock();
+    try {
+      for (KeyValueHeap h : this.heapsForDelayedClose) {
+        h.close();// There wont be further fetch of Cells from these scanners. Just close.
+      }
+      this.heapsForDelayedClose.clear();
+      if (this.heap != null) {
+        this.heap.shipped();
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4d8fab1/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
index 4f5b84d..b2ef1bd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
@@ -144,6 +144,11 @@ public class TestCoprocessorInterface {
     public int getBatch() {
       return delegate.getBatch();
     }
+
+    @Override
+    public void shipped() throws IOException {
+      this.delegate.shipped();
+    }
   }
 
   public static class CoprocessorImpl extends BaseRegionObserver {