You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2013/04/24 20:18:19 UTC

svn commit: r1471580 - in /hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver: HRegionServer.java RegionScanner.java

Author: liyin
Date: Wed Apr 24 18:18:18 2013
New Revision: 1471580

URL: http://svn.apache.org/r1471580
Log:
[HBASE-8114][89-fb] Interrupt the RS RPC call when the client connection has been closed

Author: adela

Summary:
my changes were overwritten by a rebase that I did and I
figured out that the logic is not in trunk, fixing it

Test Plan:
mr unit tests finished successfully. Since this is hard to test with a unit test I was testing with the following scenario:
- Instead of testing whether the client has closed the connection I put a very small timeout ~100ms.
- I start a scan over a table. - The scan times out because 100ms expired.

Reviewers: manukranthk, liyintang

Reviewed By: manukranthk

CC: hbase-eng@

Differential Revision: https://phabricator.fb.com/D783731

Task ID: 2044405

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1471580&r1=1471579&r2=1471580&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Apr 24 18:18:18 2013
@@ -627,7 +627,11 @@ public class HRegionServer implements HR
    * @return
    */
   public static boolean isCurrentConnectionClosed() {
-    return callContext.get().getConnection().getSocket().isClosed();
+    // this is checked just because of the unit tests
+    if (callContext.get() != null) {
+      return callContext.get().getConnection().getSocket().isClosed();
+    }
+    return false;
   }
 
   /**

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java?rev=1471580&r1=1471579&r2=1471580&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java Wed Apr 24 18:18:18 2013
@@ -30,6 +30,9 @@ import java.util.concurrent.ExecutionExc
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
@@ -49,6 +52,7 @@ import org.apache.hadoop.hbase.util.Byte
  */
 public class RegionScanner implements InternalScanner {
 //Package local for testability
+  public static final Log LOG = LogFactory.getLog(RegionScanner.class);
   KeyValueHeap storeHeap = null;
   private final byte [] stopRow;
   private Filter filter;
@@ -197,7 +201,7 @@ public class RegionScanner implements In
         getOriginalScan().setCurrentPartialResponseSize(0);
         int maxResponseSize = getOriginalScan().getMaxResponseSize();
         do {
-          moreRows = nextInternal(tmpList, limit, metric, null);
+          moreRows = nextInternal(tmpList, limit, metric, null, true);
           if (!tmpList.isEmpty()) {
             currentNbRows++;
             if (outResults != null) {
@@ -301,10 +305,10 @@ public class RegionScanner implements In
     if (outResults.isEmpty()) {
        // Usually outResults is empty. This is true when next is called
        // to handle scan or get operation.
-      returnResult = nextInternal(outResults, limit, metric, kvContext);
+      returnResult = nextInternal(outResults, limit, metric, kvContext, false);
     } else {
       List<KeyValue> tmpList = new ArrayList<KeyValue>();
-      returnResult = nextInternal(tmpList, limit, metric, kvContext);
+      returnResult = nextInternal(tmpList, limit, metric, kvContext, false);
       outResults.addAll(tmpList);
     }
     rowReadCnt.incrementAndGet();
@@ -340,7 +344,7 @@ public class RegionScanner implements In
    * @param results empty list in which results will be stored
    */
   private boolean nextInternal(List<KeyValue> results, int limit, String metric,
-      KeyValueContext kvContext)
+      KeyValueContext kvContext, boolean prefetch)
       throws IOException {
 
     if (!results.isEmpty()) {
@@ -350,6 +354,11 @@ public class RegionScanner implements In
     boolean partialRow = getOriginalScan().isPartialRow();
     long maxResponseSize = getOriginalScan().getMaxResponseSize();
     while (true) {
+      if (!prefetch && HRegionServer.isCurrentConnectionClosed()) {
+        HRegion.incrNumericMetric(HConstants.SERVER_INTERRUPTED_CALLS_KEY, 1);
+        LOG.error(HConstants.CLIENT_SOCKED_CLOSED_EXC_MSG);
+        throw new IOException(HConstants.CLIENT_SOCKED_CLOSED_EXC_MSG);
+      }
       byte [] currentRow = peekRow();
       if (isStopRow(currentRow)) {
         if (filter != null && filter.hasFilterRow()) {
@@ -366,6 +375,12 @@ public class RegionScanner implements In
       } else {
         byte [] nextRow;
         do {
+          if (!prefetch && HRegionServer.isCurrentConnectionClosed()) {
+            HRegion.incrNumericMetric(HConstants.SERVER_INTERRUPTED_CALLS_KEY,
+                1);
+            LOG.error(HConstants.CLIENT_SOCKED_CLOSED_EXC_MSG);
+            throw new IOException(HConstants.CLIENT_SOCKED_CLOSED_EXC_MSG);
+          }
           this.storeHeap.next(results, limit - results.size(), metric, kvContext);
           if (limit > 0 && results.size() == limit) {
             if (this.filter != null && filter.hasFilterRow())