You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:06:58 UTC

svn commit: r1181419 - in /hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase: client/HConnectionManager.java ipc/HBaseClient.java ipc/HBaseRPC.java regionserver/HRegionServer.java

Author: nspiegelberg
Date: Tue Oct 11 02:06:57 2011
New Revision: 1181419

URL: http://svn.apache.org/viewvc?rev=1181419&view=rev
Log:
Support hbase RPC timeout

Summary:
Allow RPC to have a timeout for each RPC

Test Plan:
unit test

DiffCamp Revision: 174183
Reviewed By: dhruba
Commenters: kannan, nspiegelberg
CC: nspiegelberg, dhruba, hkuang, kannan, hbase@lists
Tasks:
#421804: timeouts on HBase RPCs

Revert Plan:
OK

Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1181419&r1=1181418&r2=1181419&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Tue Oct 11 02:06:57 2011
@@ -385,7 +385,8 @@ public class HConnectionManager {
             if (masterLocation != null) {
               HMasterInterface tryMaster = (HMasterInterface)HBaseRPC.getProxy(
                   HMasterInterface.class, HBaseRPCProtocolVersion.versionID,
-                  masterLocation.getInetSocketAddress(), this.conf);
+                  masterLocation.getInetSocketAddress(), this.conf,
+                  (int)this.rpcTimeout);
 
               if (tryMaster.isMasterRunning()) {
                 this.master = tryMaster;
@@ -1031,7 +1032,7 @@ public class HConnectionManager {
             server = (HRegionInterface)HBaseRPC.waitForProxy(
                 serverInterfaceClass, HBaseRPCProtocolVersion.versionID,
                 regionServer.getInetSocketAddress(), this.conf,
-                this.maxRPCAttempts, this.rpcTimeout);
+                this.maxRPCAttempts, (int)this.rpcTimeout, this.rpcTimeout);
           } catch (RemoteException e) {
             throw RemoteExceptionHandler.decodeRemoteException(e);
           }

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1181419&r1=1181418&r2=1181419&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Tue Oct 11 02:06:57 2011
@@ -78,7 +78,7 @@ public class HBaseClient {
   final protected long failureSleep; // Time to sleep before retry on failure.
   protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
   protected final boolean tcpKeepAlive; // if T then use keepalives
-  protected final int pingInterval; // how often sends ping to the server in msecs
+  protected int pingInterval; // how often sends ping to the server in msecs
 
   protected final SocketFactory socketFactory;           // how to create sockets
   private int refCount = 1;
@@ -193,7 +193,7 @@ public class HBaseClient {
     private IOException closeException; // close reason
 
     public Connection(InetSocketAddress address) throws IOException {
-      this(new ConnectionId(address, null));
+      this(new ConnectionId(address, null, 0));
     }
 
     public Connection(ConnectionId remoteId) throws IOException {
@@ -244,7 +244,8 @@ public class HBaseClient {
        * otherwise, throw the timeout exception.
        */
       private void handleTimeout(SocketTimeoutException e) throws IOException {
-        if (shouldCloseConnection.get() || !running.get()) {
+        if (shouldCloseConnection.get() || !running.get() ||
+            remoteId.rpcTimeout > 0) {
           throw e;
         }
         sendPing();
@@ -307,6 +308,9 @@ public class HBaseClient {
             this.socket.setKeepAlive(tcpKeepAlive);
             // connection time out is 20s
             NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
+            if (remoteId.rpcTimeout > 0) {
+              pingInterval = remoteId.rpcTimeout; // overwrite pingInterval
+            }
             this.socket.setSoTimeout(pingInterval);
             break;
           } catch (SocketTimeoutException toe) {
@@ -715,14 +719,14 @@ public class HBaseClient {
    */
   public Writable call(Writable param, InetSocketAddress address)
   throws IOException {
-      return call(param, address, null);
+      return call(param, address, null, 0);
   }
 
   public Writable call(Writable param, InetSocketAddress addr,
-                       UserGroupInformation ticket)
+                       UserGroupInformation ticket, int rpcTimeout)
                        throws IOException {
     Call call = new Call(param);
-    Connection connection = getConnection(addr, ticket, call);
+    Connection connection = getConnection(addr, ticket, rpcTimeout, call);
     connection.sendParam(call);                 // send the parameter
     boolean interrupted = false;
     //noinspection SynchronizationOnLocalVariableOrMethodParameter
@@ -805,7 +809,7 @@ public class HBaseClient {
       for (int i = 0; i < params.length; i++) {
         ParallelCall call = new ParallelCall(params[i], results, i);
         try {
-          Connection connection = getConnection(addresses[i], null, call);
+          Connection connection = getConnection(addresses[i], null, 0, call);
           connection.sendParam(call);             // send each parameter
         } catch (IOException e) {
           // log errors
@@ -828,6 +832,7 @@ public class HBaseClient {
    * pool.  Connections to a given host/port are reused. */
   private Connection getConnection(InetSocketAddress addr,
                                    UserGroupInformation ticket,
+                                   int rpcTimeout,
                                    Call call)
                                    throws IOException {
     if (!running.get()) {
@@ -839,7 +844,7 @@ public class HBaseClient {
      * connectionsId object and with set() method. We need to manage the
      * refs for keys in HashMap properly. For now its ok.
      */
-    ConnectionId remoteId = new ConnectionId(addr, ticket);
+    ConnectionId remoteId = new ConnectionId(addr, ticket, rpcTimeout);
     do {
       synchronized (connections) {
         connection = connections.get(remoteId);
@@ -865,10 +870,13 @@ public class HBaseClient {
   private static class ConnectionId {
     final InetSocketAddress address;
     final UserGroupInformation ticket;
+    final private int rpcTimeout;
 
-    ConnectionId(InetSocketAddress address, UserGroupInformation ticket) {
+    ConnectionId(InetSocketAddress address, UserGroupInformation ticket,
+        int rpcTimeout) {
       this.address = address;
       this.ticket = ticket;
+      this.rpcTimeout = rpcTimeout;
     }
 
     InetSocketAddress getAddress() {
@@ -882,7 +890,8 @@ public class HBaseClient {
     public boolean equals(Object obj) {
      if (obj instanceof ConnectionId) {
        ConnectionId id = (ConnectionId) obj;
-       return address.equals(id.address) && ticket == id.ticket;
+       return address.equals(id.address) && ticket == id.ticket &&
+       rpcTimeout == id.rpcTimeout;
        //Note : ticket is a ref comparision.
      }
      return false;
@@ -890,7 +899,7 @@ public class HBaseClient {
 
     @Override
     public int hashCode() {
-      return address.hashCode() ^ System.identityHashCode(ticket);
+      return address.hashCode() ^ System.identityHashCode(ticket) ^ rpcTimeout;
     }
   }
 }

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=1181419&r1=1181418&r2=1181419&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Tue Oct 11 02:06:57 2011
@@ -228,6 +228,7 @@ public class HBaseRPC {
     private UserGroupInformation ticket;
     private HBaseClient client;
     private boolean isClosed = false;
+    final private int rpcTimeout;
 
     /**
      * @param address address for invoker
@@ -236,10 +237,11 @@ public class HBaseRPC {
      * @param factory socket factory
      */
     public Invoker(InetSocketAddress address, UserGroupInformation ticket,
-                   Configuration conf, SocketFactory factory) {
+                   Configuration conf, SocketFactory factory, int rpcTimeout) {
       this.address = address;
       this.ticket = ticket;
       this.client = CLIENTS.getClient(conf, factory);
+      this.rpcTimeout = rpcTimeout;
     }
 
     public Object invoke(Object proxy, Method method, Object[] args)
@@ -250,7 +252,7 @@ public class HBaseRPC {
         startTime = System.currentTimeMillis();
       }
       HbaseObjectWritable value = (HbaseObjectWritable)
-        client.call(new Invocation(method, args), address, ticket);
+        client.call(new Invocation(method, args), address, ticket, rpcTimeout);
       if (logDebug) {
         long callTime = System.currentTimeMillis() - startTime;
         LOG.debug("Call: " + method.getName() + " " + callTime);
@@ -321,6 +323,7 @@ public class HBaseRPC {
    * @param addr address of remote service
    * @param conf configuration
    * @param maxAttempts max attempts
+   * @param rpcTimeout timeout for each RPC
    * @param timeout timeout in milliseconds
    * @return proxy
    * @throws IOException e
@@ -331,6 +334,7 @@ public class HBaseRPC {
                                                InetSocketAddress addr,
                                                Configuration conf,
                                                int maxAttempts,
+                                               int rpcTimeout,
                                                long timeout
                                                ) throws IOException {
     // HBase does limited number of reconnects which is different from hadoop.
@@ -339,7 +343,7 @@ public class HBaseRPC {
     int reconnectAttempts = 0;
     while (true) {
       try {
-        return getProxy(protocol, clientVersion, addr, conf);
+        return getProxy(protocol, clientVersion, addr, conf, rpcTimeout);
       } catch(ConnectException se) {  // namenode has not been started
         ioe = se;
         if (maxAttempts >= 0 && ++reconnectAttempts >= maxAttempts) {
@@ -375,13 +379,15 @@ public class HBaseRPC {
    * @param addr remote address
    * @param conf configuration
    * @param factory socket factory
+   * @param rpcTimeout timeout for each RPC
    * @return proxy
    * @throws IOException e
    */
   public static VersionedProtocol getProxy(Class<?> protocol,
       long clientVersion, InetSocketAddress addr, Configuration conf,
-      SocketFactory factory) throws IOException {
-    return getProxy(protocol, clientVersion, addr, null, conf, factory);
+      SocketFactory factory, int rpcTimeout) throws IOException {
+    return getProxy(protocol, clientVersion, addr, null, conf, factory,
+        rpcTimeout);
   }
 
   /**
@@ -394,17 +400,18 @@ public class HBaseRPC {
    * @param ticket ticket
    * @param conf configuration
    * @param factory socket factory
+   * @param rpcTimeout timeout for each RPC
    * @return proxy
    * @throws IOException e
    */
   public static VersionedProtocol getProxy(Class<?> protocol,
       long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
-      Configuration conf, SocketFactory factory)
+      Configuration conf, SocketFactory factory, int rpcTimeout)
   throws IOException {
     VersionedProtocol proxy =
         (VersionedProtocol) Proxy.newProxyInstance(
             protocol.getClassLoader(), new Class[] { protocol },
-            new Invoker(addr, ticket, conf, factory));
+            new Invoker(addr, ticket, conf, factory, rpcTimeout));
     long serverVersion = proxy.getProtocolVersion(protocol.getName(),
                                                   clientVersion);
     if (serverVersion == clientVersion) {
@@ -421,15 +428,17 @@ public class HBaseRPC {
    * @param clientVersion version we are expecting
    * @param addr remote address
    * @param conf configuration
+   * @param rpcTimeout timeout for each RPC
    * @return a proxy instance
    * @throws IOException e
    */
   public static VersionedProtocol getProxy(Class<?> protocol,
-      long clientVersion, InetSocketAddress addr, Configuration conf)
+      long clientVersion, InetSocketAddress addr, Configuration conf,
+      int rpcTimeout)
       throws IOException {
 
     return getProxy(protocol, clientVersion, addr, conf, NetUtils
-        .getDefaultSocketFactory(conf));
+        .getDefaultSocketFactory(conf), rpcTimeout);
   }
 
   /**

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1181419&r1=1181418&r2=1181419&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Oct 11 02:06:57 2011
@@ -1209,7 +1209,8 @@ public class HRegionServer implements HR
         // should retry indefinitely.
         master = (HMasterRegionInterface)HBaseRPC.waitForProxy(
           HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID,
-          masterAddress.getInetSocketAddress(), this.conf, -1, this.rpcTimeout);
+          masterAddress.getInetSocketAddress(), this.conf, -1,
+          (int)this.rpcTimeout, this.rpcTimeout);
       } catch (IOException e) {
         LOG.warn("Unable to connect to master. Retrying. Error was:", e);
         sleeper.sleep();