You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jx...@apache.org on 2013/08/20 18:55:46 UTC
svn commit: r1515874 -
/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
Author: jxiang
Date: Tue Aug 20 16:55:46 2013
New Revision: 1515874
URL: http://svn.apache.org/r1515874
Log:
HBASE-9167 ServerCallable retries just once if timeout is not integer.max
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java?rev=1515874&r1=1515873&r2=1515874&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java Tue Aug 20 16:55:46 2013
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.NotServin
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.ipc.RemoteException;
/**
@@ -56,7 +57,9 @@ public abstract class ServerCallable<T>
protected HRegionLocation location;
protected HRegionInterface server;
protected int callTimeout;
+ protected long globalStartTime;
protected long startTime, endTime;
+ protected final static int MIN_RPC_TIMEOUT = 2000;
/**
* @param connection Connection to use.
@@ -109,27 +112,20 @@ public abstract class ServerCallable<T>
}
public void beforeCall() {
- HBaseRPC.setRpcTimeout(this.callTimeout);
- this.startTime = System.currentTimeMillis();
+ this.startTime = EnvironmentEdgeManager.currentTimeMillis();
+ int remaining = (int)(callTimeout - (this.startTime - this.globalStartTime));
+ if (remaining < MIN_RPC_TIMEOUT) {
+ // If there is no time left, we're trying anyway. It's too late.
+ // 0 means no timeout, and it's not the intent here. So we secure both cases by
+ // resetting to the minimum.
+ remaining = MIN_RPC_TIMEOUT;
+ }
+ HBaseRPC.setRpcTimeout(remaining);
}
public void afterCall() {
HBaseRPC.resetRpcTimeout();
- this.endTime = System.currentTimeMillis();
- }
-
- public void shouldRetry(Throwable throwable) throws IOException {
- if (this.callTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)
- if (throwable instanceof SocketTimeoutException
- || (this.endTime - this.startTime > this.callTimeout)) {
- throw (SocketTimeoutException) (SocketTimeoutException) new SocketTimeoutException(
- "Call to access row '" + Bytes.toString(row) + "' on table '"
- + Bytes.toString(tableName)
- + "' failed on socket timeout exception: " + throwable)
- .initCause(throwable);
- } else {
- this.callTimeout = ((int) (this.endTime - this.startTime));
- }
+ this.endTime = EnvironmentEdgeManager.currentTimeMillis();
}
/**
@@ -157,13 +153,15 @@ public abstract class ServerCallable<T>
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
+ globalStartTime = EnvironmentEdgeManager.currentTimeMillis();
+ long expectedSleep = 0;
+
for (int tries = 0; tries < numRetries; tries++) {
try {
beforeCall();
connect(tries != 0);
return call();
} catch (Throwable t) {
- shouldRetry(t);
t = translateException(t);
if (t instanceof SocketTimeoutException ||
t instanceof ConnectException ||
@@ -182,16 +180,30 @@ public abstract class ServerCallable<T>
}
RetriesExhaustedException.ThrowableWithExtraContext qt =
new RetriesExhaustedException.ThrowableWithExtraContext(t,
- System.currentTimeMillis(), toString());
+ EnvironmentEdgeManager.currentTimeMillis(), toString());
exceptions.add(qt);
if (tries == numRetries - 1) {
throw new RetriesExhaustedException(tries, exceptions);
}
+ // If the server is dead, we need to wait a little before retrying, to give
+ // a chance to the regions to be
+ // tries hasn't been bumped up yet so we use "tries + 1" to get right pause time
+ expectedSleep = ConnectionUtils.getPauseTime(pause, tries + 1);
+
+ // If, after the planned sleep, there won't be enough time left, we stop now.
+ long duration = singleCallDuration(expectedSleep);
+ if (duration > this.callTimeout) {
+ throw (SocketTimeoutException) new SocketTimeoutException(
+ "Call to access row '" + Bytes.toString(row) + "' on table '"
+ + Bytes.toString(tableName) + "' failed on timeout. "
+ + " callTimeout=" + this.callTimeout + ", callDuration="
+ + duration).initCause(t);
+ }
} finally {
afterCall();
}
try {
- Thread.sleep(ConnectionUtils.getPauseTime(pause, tries));
+ Thread.sleep(expectedSleep);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Giving up after tries=" + tries, e);
@@ -209,6 +221,7 @@ public abstract class ServerCallable<T>
*/
public T withoutRetries()
throws IOException, RuntimeException {
+ globalStartTime = EnvironmentEdgeManager.currentTimeMillis();
try {
beforeCall();
connect(false);
@@ -225,6 +238,15 @@ public abstract class ServerCallable<T>
}
}
+ /**
+ * @param expectedSleep
+ * @return Calculate how long a single call took
+ */
+ private long singleCallDuration(final long expectedSleep) {
+ return (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime)
+ + MIN_RPC_TIMEOUT + expectedSleep;
+ }
+
private static Throwable translateException(Throwable t) throws IOException {
if (t instanceof UndeclaredThrowableException) {
t = t.getCause();