You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2013/04/24 20:18:19 UTC
svn commit: r1471580 - in
/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver:
HRegionServer.java RegionScanner.java
Author: liyin
Date: Wed Apr 24 18:18:18 2013
New Revision: 1471580
URL: http://svn.apache.org/r1471580
Log:
[HBASE-8114][89-fb] Interrupt the RS RPC call when the client connection has been closed
Author: adela
Summary:
my changes were overwritten by a rebase that I did and I
figured out that the logic is not in trunk, fixing it
Test Plan:
mr unit tests finished successfully. Since this is hard to test with a unit test I was testing with the following scenario:
- Instead of testing whether the client has closed the connection I put a very small timeout ~100ms.
- I start a scan over a table. - The scan times out because 100ms expired.
Reviewers: manukranthk, liyintang
Reviewed By: manukranthk
CC: hbase-eng@
Differential Revision: https://phabricator.fb.com/D783731
Task ID: 2044405
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1471580&r1=1471579&r2=1471580&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Apr 24 18:18:18 2013
@@ -627,7 +627,11 @@ public class HRegionServer implements HR
* @return
*/
public static boolean isCurrentConnectionClosed() {
- return callContext.get().getConnection().getSocket().isClosed();
+ // this is checked just because of the unit tests
+ if (callContext.get() != null) {
+ return callContext.get().getConnection().getSocket().isClosed();
+ }
+ return false;
}
/**
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java?rev=1471580&r1=1471579&r2=1471580&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java Wed Apr 24 18:18:18 2013
@@ -30,6 +30,9 @@ import java.util.concurrent.ExecutionExc
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
@@ -49,6 +52,7 @@ import org.apache.hadoop.hbase.util.Byte
*/
public class RegionScanner implements InternalScanner {
//Package local for testability
+ public static final Log LOG = LogFactory.getLog(RegionScanner.class);
KeyValueHeap storeHeap = null;
private final byte [] stopRow;
private Filter filter;
@@ -197,7 +201,7 @@ public class RegionScanner implements In
getOriginalScan().setCurrentPartialResponseSize(0);
int maxResponseSize = getOriginalScan().getMaxResponseSize();
do {
- moreRows = nextInternal(tmpList, limit, metric, null);
+ moreRows = nextInternal(tmpList, limit, metric, null, true);
if (!tmpList.isEmpty()) {
currentNbRows++;
if (outResults != null) {
@@ -301,10 +305,10 @@ public class RegionScanner implements In
if (outResults.isEmpty()) {
// Usually outResults is empty. This is true when next is called
// to handle scan or get operation.
- returnResult = nextInternal(outResults, limit, metric, kvContext);
+ returnResult = nextInternal(outResults, limit, metric, kvContext, false);
} else {
List<KeyValue> tmpList = new ArrayList<KeyValue>();
- returnResult = nextInternal(tmpList, limit, metric, kvContext);
+ returnResult = nextInternal(tmpList, limit, metric, kvContext, false);
outResults.addAll(tmpList);
}
rowReadCnt.incrementAndGet();
@@ -340,7 +344,7 @@ public class RegionScanner implements In
* @param results empty list in which results will be stored
*/
private boolean nextInternal(List<KeyValue> results, int limit, String metric,
- KeyValueContext kvContext)
+ KeyValueContext kvContext, boolean prefetch)
throws IOException {
if (!results.isEmpty()) {
@@ -350,6 +354,11 @@ public class RegionScanner implements In
boolean partialRow = getOriginalScan().isPartialRow();
long maxResponseSize = getOriginalScan().getMaxResponseSize();
while (true) {
+ if (!prefetch && HRegionServer.isCurrentConnectionClosed()) {
+ HRegion.incrNumericMetric(HConstants.SERVER_INTERRUPTED_CALLS_KEY, 1);
+ LOG.error(HConstants.CLIENT_SOCKED_CLOSED_EXC_MSG);
+ throw new IOException(HConstants.CLIENT_SOCKED_CLOSED_EXC_MSG);
+ }
byte [] currentRow = peekRow();
if (isStopRow(currentRow)) {
if (filter != null && filter.hasFilterRow()) {
@@ -366,6 +375,12 @@ public class RegionScanner implements In
} else {
byte [] nextRow;
do {
+ if (!prefetch && HRegionServer.isCurrentConnectionClosed()) {
+ HRegion.incrNumericMetric(HConstants.SERVER_INTERRUPTED_CALLS_KEY,
+ 1);
+ LOG.error(HConstants.CLIENT_SOCKED_CLOSED_EXC_MSG);
+ throw new IOException(HConstants.CLIENT_SOCKED_CLOSED_EXC_MSG);
+ }
this.storeHeap.next(results, limit - results.size(), metric, kvContext);
if (limit > 0 && results.size() == limit) {
if (this.filter != null && filter.hasFilterRow())