You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/11/02 20:15:33 UTC
svn commit: r1405107 - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/client/ClientScanner.java
test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java
Author: stack
Date: Fri Nov 2 19:15:32 2012
New Revision: 1405107
URL: http://svn.apache.org/viewvc?rev=1405107&view=rev
Log:
HBASE-7070 Scanner may retry forever after HBASE-5974
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java?rev=1405107&r1=1405106&r2=1405107&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java Fri Nov 2 19:15:32 2012
@@ -266,6 +266,7 @@ public class ClientScanner extends Abstr
// This flag is set when we want to skip the result returned. We do
// this when we reset scanner because it split under us.
boolean skipFirst = false;
+ boolean retryAfterOutOfOrderException = true;
do {
try {
if (skipFirst) {
@@ -280,6 +281,7 @@ public class ClientScanner extends Abstr
// returns an empty array if scanning is to go on and we've just
// exhausted current region.
values = callable.withRetries();
+ retryAfterOutOfOrderException = true;
} catch (DoNotRetryIOException e) {
if (e instanceof UnknownScannerException) {
long timeout = lastNext + scannerTimeout;
@@ -310,6 +312,14 @@ public class ClientScanner extends Abstr
// invocation.
skipFirst = true;
}
+ if (e instanceof OutOfOrderScannerNextException) {
+ if (retryAfterOutOfOrderException) {
+ retryAfterOutOfOrderException = false;
+ } else {
+ throw new DoNotRetryIOException("Failed after retry"
+ + ", it could be cause by rpc timeout", e);
+ }
+ }
// Clear region
this.currentRegion = null;
callable = null;
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java?rev=1405107&r1=1405106&r2=1405107&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java Fri Nov 2 19:15:32 2012
@@ -21,6 +21,8 @@ import static org.junit.Assert.assertTru
import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
@@ -43,17 +45,20 @@ import com.google.protobuf.ServiceExcept
*/
@Category(MediumTests.class)
public class TestClientScannerRPCTimeout {
+ final Log LOG = LogFactory.getLog(getClass());
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final byte[] FAMILY = Bytes.toBytes("testFamily");
private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
private static final byte[] VALUE = Bytes.toBytes("testValue");
private static final int rpcTimeout = 2 * 1000;
+ private static final int CLIENT_RETRIES_NUMBER = 3;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout);
conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName());
+ conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES_NUMBER);
TEST_UTIL.startMiniCluster(1);
}
@@ -86,6 +91,20 @@ public class TestClientScannerRPCTimeout
result = scanner.next();
assertTrue("Expected row: row-3", Bytes.equals(r3, result.getRow()));
scanner.close();
+
+ // test the case that RPC is always timesout
+ scanner = ht.getScanner(scan);
+ RegionServerWithScanTimeout.sleepAlways = true;
+ RegionServerWithScanTimeout.tryNumber = 0;
+ try {
+ result = scanner.next();
+ } catch (IOException ioe) {
+ // catch the exception after max retry number
+ LOG.info("Failed after maximal attempts=" + CLIENT_RETRIES_NUMBER, ioe);
+ }
+ assertTrue("Expected maximal try number=" + CLIENT_RETRIES_NUMBER
+ + ", actual =" + RegionServerWithScanTimeout.tryNumber,
+ RegionServerWithScanTimeout.tryNumber <= CLIENT_RETRIES_NUMBER);
}
private void putToTable(HTable ht, byte[] rowkey) throws IOException {
@@ -98,6 +117,8 @@ public class TestClientScannerRPCTimeout
private long tableScannerId;
private boolean slept;
private static long seqNoToSleepOn = -1;
+ private static boolean sleepAlways = false;
+ private static int tryNumber = 0;
public RegionServerWithScanTimeout(Configuration conf) throws IOException, InterruptedException {
super(conf);
@@ -107,15 +128,20 @@ public class TestClientScannerRPCTimeout
public ScanResponse scan(final RpcController controller, final ScanRequest request)
throws ServiceException {
if (request.hasScannerId()) {
- if (!slept && this.tableScannerId == request.getScannerId()
- && seqNoToSleepOn == request.getNextCallSeq()) {
+ ScanResponse scanResponse = super.scan(controller, request);
+ if (this.tableScannerId == request.getScannerId() &&
+ (sleepAlways || (!slept && seqNoToSleepOn == request.getNextCallSeq()))) {
try {
Thread.sleep(rpcTimeout + 500);
} catch (InterruptedException e) {
}
slept = true;
+ tryNumber++;
+ if (tryNumber > 2 * CLIENT_RETRIES_NUMBER) {
+ sleepAlways = false;
+ }
}
- return super.scan(controller, request);
+ return scanResponse;
} else {
ScanResponse scanRes = super.scan(controller, request);
String regionName = Bytes.toString(request.getRegion().getValue().toByteArray());