You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:06:58 UTC
svn commit: r1181419 - in
/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase:
client/HConnectionManager.java ipc/HBaseClient.java ipc/HBaseRPC.java
regionserver/HRegionServer.java
Author: nspiegelberg
Date: Tue Oct 11 02:06:57 2011
New Revision: 1181419
URL: http://svn.apache.org/viewvc?rev=1181419&view=rev
Log:
Support hbase RPC timeout
Summary:
Allow RPC to have a timeout for each RPC
Test Plan:
unit test
DiffCamp Revision: 174183
Reviewed By: dhruba
Commenters: kannan, nspiegelberg
CC: nspiegelberg, dhruba, hkuang, kannan, hbase@lists
Tasks:
#421804: timeouts on HBase RPCs
Revert Plan:
OK
Modified:
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1181419&r1=1181418&r2=1181419&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Tue Oct 11 02:06:57 2011
@@ -385,7 +385,8 @@ public class HConnectionManager {
if (masterLocation != null) {
HMasterInterface tryMaster = (HMasterInterface)HBaseRPC.getProxy(
HMasterInterface.class, HBaseRPCProtocolVersion.versionID,
- masterLocation.getInetSocketAddress(), this.conf);
+ masterLocation.getInetSocketAddress(), this.conf,
+ (int)this.rpcTimeout);
if (tryMaster.isMasterRunning()) {
this.master = tryMaster;
@@ -1031,7 +1032,7 @@ public class HConnectionManager {
server = (HRegionInterface)HBaseRPC.waitForProxy(
serverInterfaceClass, HBaseRPCProtocolVersion.versionID,
regionServer.getInetSocketAddress(), this.conf,
- this.maxRPCAttempts, this.rpcTimeout);
+ this.maxRPCAttempts, (int)this.rpcTimeout, this.rpcTimeout);
} catch (RemoteException e) {
throw RemoteExceptionHandler.decodeRemoteException(e);
}
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1181419&r1=1181418&r2=1181419&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Tue Oct 11 02:06:57 2011
@@ -78,7 +78,7 @@ public class HBaseClient {
final protected long failureSleep; // Time to sleep before retry on failure.
protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
protected final boolean tcpKeepAlive; // if T then use keepalives
- protected final int pingInterval; // how often sends ping to the server in msecs
+ protected int pingInterval; // how often sends ping to the server in msecs
protected final SocketFactory socketFactory; // how to create sockets
private int refCount = 1;
@@ -193,7 +193,7 @@ public class HBaseClient {
private IOException closeException; // close reason
public Connection(InetSocketAddress address) throws IOException {
- this(new ConnectionId(address, null));
+ this(new ConnectionId(address, null, 0));
}
public Connection(ConnectionId remoteId) throws IOException {
@@ -244,7 +244,8 @@ public class HBaseClient {
* otherwise, throw the timeout exception.
*/
private void handleTimeout(SocketTimeoutException e) throws IOException {
- if (shouldCloseConnection.get() || !running.get()) {
+ if (shouldCloseConnection.get() || !running.get() ||
+ remoteId.rpcTimeout > 0) {
throw e;
}
sendPing();
@@ -307,6 +308,9 @@ public class HBaseClient {
this.socket.setKeepAlive(tcpKeepAlive);
// connection time out is 20s
NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
+ if (remoteId.rpcTimeout > 0) {
+ pingInterval = remoteId.rpcTimeout; // overwrite pingInterval
+ }
this.socket.setSoTimeout(pingInterval);
break;
} catch (SocketTimeoutException toe) {
@@ -715,14 +719,14 @@ public class HBaseClient {
*/
public Writable call(Writable param, InetSocketAddress address)
throws IOException {
- return call(param, address, null);
+ return call(param, address, null, 0);
}
public Writable call(Writable param, InetSocketAddress addr,
- UserGroupInformation ticket)
+ UserGroupInformation ticket, int rpcTimeout)
throws IOException {
Call call = new Call(param);
- Connection connection = getConnection(addr, ticket, call);
+ Connection connection = getConnection(addr, ticket, rpcTimeout, call);
connection.sendParam(call); // send the parameter
boolean interrupted = false;
//noinspection SynchronizationOnLocalVariableOrMethodParameter
@@ -805,7 +809,7 @@ public class HBaseClient {
for (int i = 0; i < params.length; i++) {
ParallelCall call = new ParallelCall(params[i], results, i);
try {
- Connection connection = getConnection(addresses[i], null, call);
+ Connection connection = getConnection(addresses[i], null, 0, call);
connection.sendParam(call); // send each parameter
} catch (IOException e) {
// log errors
@@ -828,6 +832,7 @@ public class HBaseClient {
* pool. Connections to a given host/port are reused. */
private Connection getConnection(InetSocketAddress addr,
UserGroupInformation ticket,
+ int rpcTimeout,
Call call)
throws IOException {
if (!running.get()) {
@@ -839,7 +844,7 @@ public class HBaseClient {
* connectionsId object and with set() method. We need to manage the
* refs for keys in HashMap properly. For now its ok.
*/
- ConnectionId remoteId = new ConnectionId(addr, ticket);
+ ConnectionId remoteId = new ConnectionId(addr, ticket, rpcTimeout);
do {
synchronized (connections) {
connection = connections.get(remoteId);
@@ -865,10 +870,13 @@ public class HBaseClient {
private static class ConnectionId {
final InetSocketAddress address;
final UserGroupInformation ticket;
+ final private int rpcTimeout;
- ConnectionId(InetSocketAddress address, UserGroupInformation ticket) {
+ ConnectionId(InetSocketAddress address, UserGroupInformation ticket,
+ int rpcTimeout) {
this.address = address;
this.ticket = ticket;
+ this.rpcTimeout = rpcTimeout;
}
InetSocketAddress getAddress() {
@@ -882,7 +890,8 @@ public class HBaseClient {
public boolean equals(Object obj) {
if (obj instanceof ConnectionId) {
ConnectionId id = (ConnectionId) obj;
- return address.equals(id.address) && ticket == id.ticket;
+ return address.equals(id.address) && ticket == id.ticket &&
+ rpcTimeout == id.rpcTimeout;
//Note : ticket is a ref comparision.
}
return false;
@@ -890,7 +899,7 @@ public class HBaseClient {
@Override
public int hashCode() {
- return address.hashCode() ^ System.identityHashCode(ticket);
+ return address.hashCode() ^ System.identityHashCode(ticket) ^ rpcTimeout;
}
}
}
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=1181419&r1=1181418&r2=1181419&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Tue Oct 11 02:06:57 2011
@@ -228,6 +228,7 @@ public class HBaseRPC {
private UserGroupInformation ticket;
private HBaseClient client;
private boolean isClosed = false;
+ final private int rpcTimeout;
/**
* @param address address for invoker
@@ -236,10 +237,11 @@ public class HBaseRPC {
* @param factory socket factory
*/
public Invoker(InetSocketAddress address, UserGroupInformation ticket,
- Configuration conf, SocketFactory factory) {
+ Configuration conf, SocketFactory factory, int rpcTimeout) {
this.address = address;
this.ticket = ticket;
this.client = CLIENTS.getClient(conf, factory);
+ this.rpcTimeout = rpcTimeout;
}
public Object invoke(Object proxy, Method method, Object[] args)
@@ -250,7 +252,7 @@ public class HBaseRPC {
startTime = System.currentTimeMillis();
}
HbaseObjectWritable value = (HbaseObjectWritable)
- client.call(new Invocation(method, args), address, ticket);
+ client.call(new Invocation(method, args), address, ticket, rpcTimeout);
if (logDebug) {
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
@@ -321,6 +323,7 @@ public class HBaseRPC {
* @param addr address of remote service
* @param conf configuration
* @param maxAttempts max attempts
+ * @param rpcTimeout timeout for each RPC
* @param timeout timeout in milliseconds
* @return proxy
* @throws IOException e
@@ -331,6 +334,7 @@ public class HBaseRPC {
InetSocketAddress addr,
Configuration conf,
int maxAttempts,
+ int rpcTimeout,
long timeout
) throws IOException {
// HBase does limited number of reconnects which is different from hadoop.
@@ -339,7 +343,7 @@ public class HBaseRPC {
int reconnectAttempts = 0;
while (true) {
try {
- return getProxy(protocol, clientVersion, addr, conf);
+ return getProxy(protocol, clientVersion, addr, conf, rpcTimeout);
} catch(ConnectException se) { // namenode has not been started
ioe = se;
if (maxAttempts >= 0 && ++reconnectAttempts >= maxAttempts) {
@@ -375,13 +379,15 @@ public class HBaseRPC {
* @param addr remote address
* @param conf configuration
* @param factory socket factory
+ * @param rpcTimeout timeout for each RPC
* @return proxy
* @throws IOException e
*/
public static VersionedProtocol getProxy(Class<?> protocol,
long clientVersion, InetSocketAddress addr, Configuration conf,
- SocketFactory factory) throws IOException {
- return getProxy(protocol, clientVersion, addr, null, conf, factory);
+ SocketFactory factory, int rpcTimeout) throws IOException {
+ return getProxy(protocol, clientVersion, addr, null, conf, factory,
+ rpcTimeout);
}
/**
@@ -394,17 +400,18 @@ public class HBaseRPC {
* @param ticket ticket
* @param conf configuration
* @param factory socket factory
+ * @param rpcTimeout timeout for each RPC
* @return proxy
* @throws IOException e
*/
public static VersionedProtocol getProxy(Class<?> protocol,
long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
- Configuration conf, SocketFactory factory)
+ Configuration conf, SocketFactory factory, int rpcTimeout)
throws IOException {
VersionedProtocol proxy =
(VersionedProtocol) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[] { protocol },
- new Invoker(addr, ticket, conf, factory));
+ new Invoker(addr, ticket, conf, factory, rpcTimeout));
long serverVersion = proxy.getProtocolVersion(protocol.getName(),
clientVersion);
if (serverVersion == clientVersion) {
@@ -421,15 +428,17 @@ public class HBaseRPC {
* @param clientVersion version we are expecting
* @param addr remote address
* @param conf configuration
+ * @param rpcTimeout timeout for each RPC
* @return a proxy instance
* @throws IOException e
*/
public static VersionedProtocol getProxy(Class<?> protocol,
- long clientVersion, InetSocketAddress addr, Configuration conf)
+ long clientVersion, InetSocketAddress addr, Configuration conf,
+ int rpcTimeout)
throws IOException {
return getProxy(protocol, clientVersion, addr, conf, NetUtils
- .getDefaultSocketFactory(conf));
+ .getDefaultSocketFactory(conf), rpcTimeout);
}
/**
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1181419&r1=1181418&r2=1181419&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Oct 11 02:06:57 2011
@@ -1209,7 +1209,8 @@ public class HRegionServer implements HR
// should retry indefinitely.
master = (HMasterRegionInterface)HBaseRPC.waitForProxy(
HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID,
- masterAddress.getInetSocketAddress(), this.conf, -1, this.rpcTimeout);
+ masterAddress.getInetSocketAddress(), this.conf, -1,
+ (int)this.rpcTimeout, this.rpcTimeout);
} catch (IOException e) {
LOG.warn("Unable to connect to master. Retrying. Error was:", e);
sleeper.sleep();