You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jx...@apache.org on 2013/08/20 18:55:46 UTC

svn commit: r1515874 - /hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java

Author: jxiang
Date: Tue Aug 20 16:55:46 2013
New Revision: 1515874

URL: http://svn.apache.org/r1515874
Log:
HBASE-9167 ServerCallable retries just once if timeout is not integer.max

Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java?rev=1515874&r1=1515873&r2=1515874&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java Tue Aug 20 16:55:46 2013
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.NotServin
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.ipc.RemoteException;
 
 /**
@@ -56,7 +57,9 @@ public abstract class ServerCallable<T> 
   protected HRegionLocation location;
   protected HRegionInterface server;
   protected int callTimeout;
+  protected long globalStartTime;
   protected long startTime, endTime;
+  protected final static int MIN_RPC_TIMEOUT = 2000;
 
   /**
    * @param connection Connection to use.
@@ -109,27 +112,20 @@ public abstract class ServerCallable<T> 
   }
 
   public void beforeCall() {
-    HBaseRPC.setRpcTimeout(this.callTimeout);
-    this.startTime = System.currentTimeMillis();
+    this.startTime = EnvironmentEdgeManager.currentTimeMillis();
+    int remaining = (int)(callTimeout - (this.startTime - this.globalStartTime));
+    if (remaining < MIN_RPC_TIMEOUT) {
+      // If there is no time left, we're trying anyway. It's too late.
+      // 0 means no timeout, and it's not the intent here. So we secure both cases by
+      // resetting to the minimum.
+      remaining = MIN_RPC_TIMEOUT;
+    }
+    HBaseRPC.setRpcTimeout(remaining);
   }
 
   public void afterCall() {
     HBaseRPC.resetRpcTimeout();
-    this.endTime = System.currentTimeMillis();
-  }
-
-  public void shouldRetry(Throwable throwable) throws IOException {
-    if (this.callTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)
-      if (throwable instanceof SocketTimeoutException
-          || (this.endTime - this.startTime > this.callTimeout)) {
-        throw (SocketTimeoutException) (SocketTimeoutException) new SocketTimeoutException(
-            "Call to access row '" + Bytes.toString(row) + "' on table '"
-                + Bytes.toString(tableName)
-                + "' failed on socket timeout exception: " + throwable)
-            .initCause(throwable);
-      } else {
-        this.callTimeout = ((int) (this.endTime - this.startTime));
-      }
+    this.endTime = EnvironmentEdgeManager.currentTimeMillis();
   }
 
   /**
@@ -157,13 +153,15 @@ public abstract class ServerCallable<T> 
       HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
     List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
       new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
+    globalStartTime = EnvironmentEdgeManager.currentTimeMillis();
+    long expectedSleep = 0;
+
     for (int tries = 0; tries < numRetries; tries++) {
       try {
         beforeCall();
         connect(tries != 0);
         return call();
       } catch (Throwable t) {
-        shouldRetry(t);
         t = translateException(t);
         if (t instanceof SocketTimeoutException ||
             t instanceof ConnectException ||
@@ -182,16 +180,30 @@ public abstract class ServerCallable<T> 
         }
         RetriesExhaustedException.ThrowableWithExtraContext qt =
           new RetriesExhaustedException.ThrowableWithExtraContext(t,
-            System.currentTimeMillis(), toString());
+            EnvironmentEdgeManager.currentTimeMillis(), toString());
         exceptions.add(qt);
         if (tries == numRetries - 1) {
           throw new RetriesExhaustedException(tries, exceptions);
         }
+        // If the server is dead, we need to wait a little before retrying, to give
+        //  a chance to the regions to be
+        // tries hasn't been bumped up yet so we use "tries + 1" to get right pause time
+        expectedSleep = ConnectionUtils.getPauseTime(pause, tries + 1);
+
+        // If, after the planned sleep, there won't be enough time left, we stop now.
+        long duration = singleCallDuration(expectedSleep);
+        if (duration > this.callTimeout) {
+          throw (SocketTimeoutException) new SocketTimeoutException(
+            "Call to access row '" + Bytes.toString(row) + "' on table '"
+              + Bytes.toString(tableName) + "' failed on timeout. "
+              + " callTimeout=" + this.callTimeout + ", callDuration="
+              + duration).initCause(t);
+        }
       } finally {
         afterCall();
       }
       try {
-        Thread.sleep(ConnectionUtils.getPauseTime(pause, tries));
+        Thread.sleep(expectedSleep);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new IOException("Giving up after tries=" + tries, e);
@@ -209,6 +221,7 @@ public abstract class ServerCallable<T> 
    */
   public T withoutRetries()
   throws IOException, RuntimeException {
+    globalStartTime = EnvironmentEdgeManager.currentTimeMillis();
     try {
       beforeCall();
       connect(false);
@@ -225,6 +238,15 @@ public abstract class ServerCallable<T> 
     }
   }
 
+  /**
+   * @param expectedSleep
+   * @return Calculate how long a single call took
+   */
+  private long singleCallDuration(final long expectedSleep) {
+    return (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime)
+      + MIN_RPC_TIMEOUT + expectedSleep;
+  }
+
   private static Throwable translateException(Throwable t) throws IOException {
     if (t instanceof UndeclaredThrowableException) {
       t = t.getCause();