You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/11/02 20:15:33 UTC

svn commit: r1405107 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/client/ClientScanner.java test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java

Author: stack
Date: Fri Nov  2 19:15:32 2012
New Revision: 1405107

URL: http://svn.apache.org/viewvc?rev=1405107&view=rev
Log:
HBASE-7070 Scanner may retry forever after HBASE-5974

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java?rev=1405107&r1=1405106&r2=1405107&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java Fri Nov  2 19:15:32 2012
@@ -266,6 +266,7 @@ public class ClientScanner extends Abstr
         // This flag is set when we want to skip the result returned.  We do
         // this when we reset scanner because it split under us.
         boolean skipFirst = false;
+        boolean retryAfterOutOfOrderException  = true;
         do {
           try {
             if (skipFirst) {
@@ -280,6 +281,7 @@ public class ClientScanner extends Abstr
             // returns an empty array if scanning is to go on and we've just
             // exhausted current region.
             values = callable.withRetries();
+            retryAfterOutOfOrderException  = true;
           } catch (DoNotRetryIOException e) {
             if (e instanceof UnknownScannerException) {
               long timeout = lastNext + scannerTimeout;
@@ -310,6 +312,14 @@ public class ClientScanner extends Abstr
               // invocation.
               skipFirst = true;
             }
+            if (e instanceof OutOfOrderScannerNextException) {
+              if (retryAfterOutOfOrderException) {
+                retryAfterOutOfOrderException = false;
+              } else {
+                throw new DoNotRetryIOException("Failed after retry"
+                    + ", it could be cause by rpc timeout", e);
+              }
+            }
             // Clear region
             this.currentRegion = null;
             callable = null;

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java?rev=1405107&r1=1405106&r2=1405107&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java Fri Nov  2 19:15:32 2012
@@ -21,6 +21,8 @@ import static org.junit.Assert.assertTru
 
 import java.io.IOException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
@@ -43,17 +45,20 @@ import com.google.protobuf.ServiceExcept
  */
 @Category(MediumTests.class)
 public class TestClientScannerRPCTimeout {
+  final Log LOG = LogFactory.getLog(getClass());
   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   private static final byte[] FAMILY = Bytes.toBytes("testFamily");
   private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
   private static final byte[] VALUE = Bytes.toBytes("testValue");
   private static final int rpcTimeout = 2 * 1000;
+  private static final int CLIENT_RETRIES_NUMBER = 3;
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     Configuration conf = TEST_UTIL.getConfiguration();
     conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout);
     conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName());
+    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES_NUMBER);
     TEST_UTIL.startMiniCluster(1);
   }
 
@@ -86,6 +91,20 @@ public class TestClientScannerRPCTimeout
     result = scanner.next();
     assertTrue("Expected row: row-3", Bytes.equals(r3, result.getRow()));
     scanner.close();
+
+    // test the case that RPC is always timesout
+    scanner = ht.getScanner(scan);
+    RegionServerWithScanTimeout.sleepAlways = true;
+    RegionServerWithScanTimeout.tryNumber = 0;
+    try {
+      result = scanner.next();
+    } catch (IOException ioe) {
+      // catch the exception after max retry number
+      LOG.info("Failed after maximal attempts=" + CLIENT_RETRIES_NUMBER, ioe);
+    }
+    assertTrue("Expected maximal try number=" + CLIENT_RETRIES_NUMBER
+        + ", actual =" + RegionServerWithScanTimeout.tryNumber,
+        RegionServerWithScanTimeout.tryNumber <= CLIENT_RETRIES_NUMBER);
   }
 
   private void putToTable(HTable ht, byte[] rowkey) throws IOException {
@@ -98,6 +117,8 @@ public class TestClientScannerRPCTimeout
     private long tableScannerId;
     private boolean slept;
     private static long seqNoToSleepOn = -1;
+    private static boolean sleepAlways = false;
+    private static int tryNumber = 0;
 
     public RegionServerWithScanTimeout(Configuration conf) throws IOException, InterruptedException {
       super(conf);
@@ -107,15 +128,20 @@ public class TestClientScannerRPCTimeout
     public ScanResponse scan(final RpcController controller, final ScanRequest request)
         throws ServiceException {
       if (request.hasScannerId()) {
-        if (!slept && this.tableScannerId == request.getScannerId()
-            && seqNoToSleepOn == request.getNextCallSeq()) {
+        ScanResponse scanResponse = super.scan(controller, request);
+        if (this.tableScannerId == request.getScannerId() && 
+            (sleepAlways || (!slept && seqNoToSleepOn == request.getNextCallSeq()))) {
           try {
             Thread.sleep(rpcTimeout + 500);
           } catch (InterruptedException e) {
           }
           slept = true;
+          tryNumber++;
+          if (tryNumber > 2 * CLIENT_RETRIES_NUMBER) {
+            sleepAlways = false;
+          }
         }
-        return super.scan(controller, request);
+        return scanResponse;
       } else {
         ScanResponse scanRes = super.scan(controller, request);
         String regionName = Bytes.toString(request.getRegion().getValue().toByteArray());