You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jx...@apache.org on 2013/11/12 01:00:25 UTC
svn commit: r1540887 - in /hbase/branches/0.96/hbase-server/src:
main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
Author: jxiang
Date: Tue Nov 12 00:00:24 2013
New Revision: 1540887
URL: http://svn.apache.org/r1540887
Log:
HBASE-9926 Scanner doesn't check if a region is available
Modified:
hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1540887&r1=1540886&r2=1540887&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Nov 12 00:00:24 2013
@@ -2625,15 +2625,6 @@ public class HRegionServer implements Cl
/*
* @param t
*
- * @return Make <code>t</code> an IOE if it isn't already.
- */
- protected IOException convertThrowableToIOE(final Throwable t) {
- return convertThrowableToIOE(t, null);
- }
-
- /*
- * @param t
- *
* @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
*
* @return Make <code>t</code> an IOE if it isn't already.
@@ -2688,15 +2679,16 @@ public class HRegionServer implements Cl
return this.fsOk;
}
- protected long addScanner(RegionScanner s) throws LeaseStillHeldException {
+ protected long addScanner(RegionScanner s, HRegion r) throws LeaseStillHeldException {
long scannerId = -1;
while (true) {
scannerId = Math.abs(rand.nextLong() << 24) ^ startcode;
String scannerName = String.valueOf(scannerId);
- RegionScannerHolder existing = scanners.putIfAbsent(scannerName, new RegionScannerHolder(s));
+ RegionScannerHolder existing =
+ scanners.putIfAbsent(scannerName, new RegionScannerHolder(s, r));
if (existing == null) {
this.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
- new ScannerListener(scannerName));
+ new ScannerListener(scannerName));
break;
}
}
@@ -2949,191 +2941,192 @@ public class HRegionServer implements Cl
}
requestCount.increment();
- try {
- int ttl = 0;
- HRegion region = null;
- RegionScanner scanner = null;
- RegionScannerHolder rsh = null;
- boolean moreResults = true;
- boolean closeScanner = false;
- ScanResponse.Builder builder = ScanResponse.newBuilder();
- if (request.hasCloseScanner()) {
- closeScanner = request.getCloseScanner();
- }
- int rows = 1;
- if (request.hasNumberOfRows()) {
- rows = request.getNumberOfRows();
+ int ttl = 0;
+ HRegion region = null;
+ RegionScanner scanner = null;
+ RegionScannerHolder rsh = null;
+ boolean moreResults = true;
+ boolean closeScanner = false;
+ ScanResponse.Builder builder = ScanResponse.newBuilder();
+ if (request.hasCloseScanner()) {
+ closeScanner = request.getCloseScanner();
+ }
+ int rows = 1;
+ if (request.hasNumberOfRows()) {
+ rows = request.getNumberOfRows();
+ }
+ if (request.hasScannerId()) {
+ rsh = scanners.get(scannerName);
+ if (rsh == null) {
+ LOG.info("Client tried to access missing scanner " + scannerName);
+ throw new UnknownScannerException(
+ "Name: " + scannerName + ", already closed?");
+ }
+ scanner = rsh.s;
+ HRegionInfo hri = scanner.getRegionInfo();
+ region = getRegion(hri.getRegionName());
+ if (region != rsh.r) { // Yes, should be the same instance
+ throw new NotServingRegionException("Region was re-opened after the scanner"
+ + scannerName + " was created: " + hri.getRegionNameAsString());
}
- if (request.hasScannerId()) {
- rsh = scanners.get(scannerName);
- if (rsh == null) {
- LOG.info("Client tried to access missing scanner " + scannerName);
- throw new UnknownScannerException(
- "Name: " + scannerName + ", already closed?");
- }
- scanner = rsh.s;
- region = getRegion(scanner.getRegionInfo().getRegionName());
- } else {
- region = getRegion(request.getRegion());
- ClientProtos.Scan protoScan = request.getScan();
- boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
- Scan scan = ProtobufUtil.toScan(protoScan);
- // if the request doesn't set this, get the default region setting.
- if (!isLoadingCfsOnDemandSet) {
- scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
- }
- scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
- region.prepareScanner(scan);
- if (region.getCoprocessorHost() != null) {
- scanner = region.getCoprocessorHost().preScannerOpen(scan);
- }
- if (scanner == null) {
- scanner = region.getScanner(scan);
- }
- if (region.getCoprocessorHost() != null) {
- scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
- }
- scannerId = addScanner(scanner);
- scannerName = String.valueOf(scannerId);
- ttl = this.scannerLeaseTimeoutPeriod;
+ } else {
+ region = getRegion(request.getRegion());
+ ClientProtos.Scan protoScan = request.getScan();
+ boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
+ Scan scan = ProtobufUtil.toScan(protoScan);
+ // if the request doesn't set this, get the default region setting.
+ if (!isLoadingCfsOnDemandSet) {
+ scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
+ }
+ scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
+ region.prepareScanner(scan);
+ if (region.getCoprocessorHost() != null) {
+ scanner = region.getCoprocessorHost().preScannerOpen(scan);
+ }
+ if (scanner == null) {
+ scanner = region.getScanner(scan);
}
+ if (region.getCoprocessorHost() != null) {
+ scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
+ }
+ scannerId = addScanner(scanner, region);
+ scannerName = String.valueOf(scannerId);
+ ttl = this.scannerLeaseTimeoutPeriod;
+ }
- if (rows > 0) {
- // if nextCallSeq does not match throw Exception straight away. This needs to be
- // performed even before checking of Lease.
- // See HBASE-5974
- if (request.hasNextCallSeq()) {
- if (rsh == null) {
- rsh = scanners.get(scannerName);
- }
- if (rsh != null) {
- if (request.getNextCallSeq() != rsh.nextCallSeq) {
- throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.nextCallSeq
- + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
- "; request=" + TextFormat.shortDebugString(request));
- }
- // Increment the nextCallSeq value which is the next expected from client.
- rsh.nextCallSeq++;
+ if (rows > 0) {
+ // if nextCallSeq does not match throw Exception straight away. This needs to be
+ // performed even before checking of Lease.
+ // See HBASE-5974
+ if (request.hasNextCallSeq()) {
+ if (rsh == null) {
+ rsh = scanners.get(scannerName);
+ }
+ if (rsh != null) {
+ if (request.getNextCallSeq() != rsh.nextCallSeq) {
+ throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.nextCallSeq
+ + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
+ "; request=" + TextFormat.shortDebugString(request));
}
+ // Increment the nextCallSeq value which is the next expected from client.
+ rsh.nextCallSeq++;
}
- try {
- // Remove lease while its being processed in server; protects against case
- // where processing of request takes > lease expiration time.
- lease = leases.removeLease(scannerName);
- List<Result> results = new ArrayList<Result>(rows);
- long currentScanResultSize = 0;
+ }
+ try {
+ // Remove lease while its being processed in server; protects against case
+ // where processing of request takes > lease expiration time.
+ lease = leases.removeLease(scannerName);
+ List<Result> results = new ArrayList<Result>(rows);
+ long currentScanResultSize = 0;
- boolean done = false;
- // Call coprocessor. Get region info from scanner.
- if (region != null && region.getCoprocessorHost() != null) {
- Boolean bypass = region.getCoprocessorHost().preScannerNext(
- scanner, results, rows);
- if (!results.isEmpty()) {
- for (Result r : results) {
- if (maxScannerResultSize < Long.MAX_VALUE){
- for (Cell kv : r.rawCells()) {
- // TODO
- currentScanResultSize += KeyValueUtil.ensureKeyValue(kv).heapSize();
- }
+ boolean done = false;
+ // Call coprocessor. Get region info from scanner.
+ if (region != null && region.getCoprocessorHost() != null) {
+ Boolean bypass = region.getCoprocessorHost().preScannerNext(
+ scanner, results, rows);
+ if (!results.isEmpty()) {
+ for (Result r : results) {
+ if (maxScannerResultSize < Long.MAX_VALUE){
+ for (Cell kv : r.rawCells()) {
+ // TODO
+ currentScanResultSize += KeyValueUtil.ensureKeyValue(kv).heapSize();
}
}
}
- if (bypass != null && bypass.booleanValue()) {
- done = true;
- }
}
+ if (bypass != null && bypass.booleanValue()) {
+ done = true;
+ }
+ }
- if (!done) {
- long maxResultSize = scanner.getMaxResultSize();
- if (maxResultSize <= 0) {
- maxResultSize = maxScannerResultSize;
- }
- List<Cell> values = new ArrayList<Cell>();
- MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
- region.startRegionOperation(Operation.SCAN);
- try {
- int i = 0;
- synchronized(scanner) {
- for (; i < rows
- && currentScanResultSize < maxResultSize; i++) {
- // Collect values to be returned here
- boolean moreRows = scanner.nextRaw(values);
- if (!values.isEmpty()) {
- if (maxScannerResultSize < Long.MAX_VALUE){
- for (Cell kv : values) {
- currentScanResultSize += KeyValueUtil.ensureKeyValue(kv).heapSize();
- }
+ if (!done) {
+ long maxResultSize = scanner.getMaxResultSize();
+ if (maxResultSize <= 0) {
+ maxResultSize = maxScannerResultSize;
+ }
+ List<Cell> values = new ArrayList<Cell>();
+ MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
+ region.startRegionOperation(Operation.SCAN);
+ try {
+ int i = 0;
+ synchronized(scanner) {
+ for (; i < rows
+ && currentScanResultSize < maxResultSize; i++) {
+ // Collect values to be returned here
+ boolean moreRows = scanner.nextRaw(values);
+ if (!values.isEmpty()) {
+ if (maxScannerResultSize < Long.MAX_VALUE){
+ for (Cell kv : values) {
+ currentScanResultSize += KeyValueUtil.ensureKeyValue(kv).heapSize();
}
- results.add(Result.create(values));
}
- if (!moreRows) {
- break;
- }
- values.clear();
+ results.add(Result.create(values));
+ }
+ if (!moreRows) {
+ break;
}
+ values.clear();
}
- region.readRequestsCount.add(i);
- } finally {
- region.closeRegionOperation();
- }
-
- // coprocessor postNext hook
- if (region != null && region.getCoprocessorHost() != null) {
- region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
}
+ region.readRequestsCount.add(i);
+ } finally {
+ region.closeRegionOperation();
}
- // If the scanner's filter - if any - is done with the scan
- // and wants to tell the client to stop the scan. This is done by passing
- // a null result, and setting moreResults to false.
- if (scanner.isFilterDone() && results.isEmpty()) {
- moreResults = false;
- results = null;
- } else {
- addResults(builder, results, controller);
- }
- } finally {
- // We're done. On way out re-add the above removed lease.
- // Adding resets expiration time on lease.
- if (scanners.containsKey(scannerName)) {
- if (lease != null) leases.addLease(lease);
- ttl = this.scannerLeaseTimeoutPeriod;
+ // coprocessor postNext hook
+ if (region != null && region.getCoprocessorHost() != null) {
+ region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
}
}
- }
- if (!moreResults || closeScanner) {
- ttl = 0;
- moreResults = false;
- if (region != null && region.getCoprocessorHost() != null) {
- if (region.getCoprocessorHost().preScannerClose(scanner)) {
- return builder.build(); // bypass
- }
+ // If the scanner's filter - if any - is done with the scan
+ // and wants to tell the client to stop the scan. This is done by passing
+ // a null result, and setting moreResults to false.
+ if (scanner.isFilterDone() && results.isEmpty()) {
+ moreResults = false;
+ results = null;
+ } else {
+ addResults(builder, results, controller);
}
- rsh = scanners.remove(scannerName);
- if (rsh != null) {
- scanner = rsh.s;
- scanner.close();
- leases.cancelLease(scannerName);
- if (region != null && region.getCoprocessorHost() != null) {
- region.getCoprocessorHost().postScannerClose(scanner);
- }
+ } finally {
+ // We're done. On way out re-add the above removed lease.
+ // Adding resets expiration time on lease.
+ if (scanners.containsKey(scannerName)) {
+ if (lease != null) leases.addLease(lease);
+ ttl = this.scannerLeaseTimeoutPeriod;
}
}
+ }
- if (ttl > 0) {
- builder.setTtl(ttl);
+ if (!moreResults || closeScanner) {
+ ttl = 0;
+ moreResults = false;
+ if (region != null && region.getCoprocessorHost() != null) {
+ if (region.getCoprocessorHost().preScannerClose(scanner)) {
+ return builder.build(); // bypass
+ }
}
- builder.setScannerId(scannerId);
- builder.setMoreResults(moreResults);
- return builder.build();
- } catch (Throwable t) {
- if (scannerName != null && t instanceof NotServingRegionException) {
- scanners.remove(scannerName);
+ rsh = scanners.remove(scannerName);
+ if (rsh != null) {
+ scanner = rsh.s;
+ scanner.close();
+ leases.cancelLease(scannerName);
+ if (region != null && region.getCoprocessorHost() != null) {
+ region.getCoprocessorHost().postScannerClose(scanner);
+ }
}
- throw convertThrowableToIOE(cleanup(t));
}
+
+ if (ttl > 0) {
+ builder.setTtl(ttl);
+ }
+ builder.setScannerId(scannerId);
+ builder.setMoreResults(moreResults);
+ return builder.build();
} catch (IOException ie) {
+ if (scannerName != null && ie instanceof NotServingRegionException) {
+ scanners.remove(scannerName);
+ }
throw new ServiceException(ie);
}
}
@@ -4300,9 +4293,11 @@ public class HRegionServer implements Cl
private static class RegionScannerHolder {
private RegionScanner s;
private long nextCallSeq = 0L;
+ private HRegion r;
- public RegionScannerHolder(RegionScanner s) {
+ public RegionScannerHolder(RegionScanner s, HRegion r) {
this.s = s;
+ this.r = r;
}
}
Modified: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java?rev=1540887&r1=1540886&r2=1540887&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java (original)
+++ hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java Tue Nov 12 00:00:24 2013
@@ -26,12 +26,23 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTestConst;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.RegionState.State;
+import org.apache.hadoop.hbase.master.RegionStates;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -424,6 +435,85 @@ public class TestScannersFromClientSide
"Testing offset + multiple CFs + maxResults");
}
+ /**
+ * Test from client side for scan while the region is reopened
+ * on the same region server.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testScanOnReopenedRegion() throws Exception {
+ byte [] TABLE = Bytes.toBytes("testScanOnReopenedRegion");
+ byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 2);
+
+ HTable ht = TEST_UTIL.createTable(TABLE, FAMILY);
+
+ Put put;
+ Scan scan;
+ Result result;
+ ResultScanner scanner;
+ boolean toLog = false;
+ List<Cell> kvListExp;
+
+ // table: row, family, c0:0, c1:1
+ put = new Put(ROW);
+ for (int i=0; i < QUALIFIERS.length; i++) {
+ KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE);
+ put.add(kv);
+ }
+ ht.put(put);
+
+ scan = new Scan(ROW);
+ scanner = ht.getScanner(scan);
+
+ HRegionLocation loc = ht.getRegionLocation(ROW);
+ HRegionInfo hri = loc.getRegionInfo();
+ MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
+ byte[] regionName = hri.getRegionName();
+ int i = cluster.getServerWith(regionName);
+ HRegionServer rs = cluster.getRegionServer(i);
+ ProtobufUtil.closeRegion(rs, regionName, false);
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ long timeOut = 300000;
+ while (true) {
+ if (rs.getOnlineRegion(regionName) == null) {
+ break;
+ }
+ assertTrue("Timed out in closing the testing region",
+ EnvironmentEdgeManager.currentTimeMillis() < startTime + timeOut);
+ Thread.sleep(500);
+ }
+
+ // Now open the region again.
+ ZooKeeperWatcher zkw = TEST_UTIL.getZooKeeperWatcher();
+ try {
+ HMaster master = cluster.getMaster();
+ RegionStates states = master.getAssignmentManager().getRegionStates();
+ states.regionOffline(hri);
+ states.updateRegionState(hri, State.OPENING);
+ ZKAssign.createNodeOffline(zkw, hri, loc.getServerName());
+ ProtobufUtil.openRegion(rs, hri);
+ startTime = EnvironmentEdgeManager.currentTimeMillis();
+ while (true) {
+ if (rs.getOnlineRegion(regionName) != null) {
+ break;
+ }
+ assertTrue("Timed out in open the testing region",
+ EnvironmentEdgeManager.currentTimeMillis() < startTime + timeOut);
+ Thread.sleep(500);
+ }
+ } finally {
+ ZKAssign.deleteNodeFailSilent(zkw, hri);
+ }
+
+ // c0:0, c1:1
+ kvListExp = new ArrayList<Cell>();
+ kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[0], 0, VALUE));
+ kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[1], 1, VALUE));
+ result = scanner.next();
+ verifyResult(result, kvListExp, toLog, "Testing scan on re-opened region");
+ }
+
static void verifyResult(Result result, List<Cell> expKvList, boolean toLog,
String msg) {