You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2010/08/05 18:39:11 UTC

svn commit: r982681 - in /hadoop/common/trunk: ./ src/java/org/apache/hadoop/ipc/ src/test/core/org/apache/hadoop/ipc/

Author: hairong
Date: Thu Aug  5 16:39:10 2010
New Revision: 982681

URL: http://svn.apache.org/viewvc?rev=982681&view=rev
Log:
HADOOP-6889. Make RPC to have an option to timeout. Contributed by Hairong Kuang.

Modified:
    hadoop/common/trunk/CHANGES.txt
    hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java
    hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Client.java
    hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java
    hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java
    hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestIPC.java

Modified: hadoop/common/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=982681&r1=982680&r2=982681&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Thu Aug  5 16:39:10 2010
@@ -33,6 +33,8 @@ Trunk (unreleased changes)
     HADOOP-6892. Common component of HDFS-1150 (Verify datanodes' identities 
     to clients in secure clusters) (jghoman)
 
+    HADOOP-6889. Make RPC to have an option to timeout. (hairong)
+
   IMPROVEMENTS
 
     HADOOP-6644. util.Shell getGROUPS_FOR_USER_COMMAND method name 
@@ -105,6 +107,7 @@ Trunk (unreleased changes)
     periodically. (Owen O'Malley and ddas via ddas)
 
     HADOOP-6890. Improve listFiles API introduced by HADOOP-6870. (hairong)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java?rev=982681&r1=982680&r2=982681&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java Thu Aug  5 16:39:10 2010
@@ -98,11 +98,13 @@ class AvroRpcEngine implements RpcEngine
   
     public ClientTransceiver(InetSocketAddress addr,
                              UserGroupInformation ticket,
-                             Configuration conf, SocketFactory factory)
+                             Configuration conf, SocketFactory factory,
+                             int rpcTimeout)
       throws IOException {
       this.tunnel =
         (TunnelProtocol)ENGINE.getProxy(TunnelProtocol.class, VERSION,
-                                        addr, ticket, conf, factory);
+                                        addr, ticket, conf, factory,
+                                        rpcTimeout);
       this.remote = addr;
     }
 
@@ -128,14 +130,15 @@ class AvroRpcEngine implements RpcEngine
 
   /** Construct a client-side proxy object that implements the named protocol,
    * talking to a server at the named address. */
-  public Object getProxy(Class protocol, long clientVersion,
+  public Object getProxy(Class<?> protocol, long clientVersion,
                          InetSocketAddress addr, UserGroupInformation ticket,
-                         Configuration conf, SocketFactory factory)
+                         Configuration conf, SocketFactory factory,
+                         int rpcTimeout)
     throws IOException {
     return Proxy.newProxyInstance
       (protocol.getClassLoader(),
        new Class[] { protocol },
-       new Invoker(protocol, addr, ticket, conf, factory));
+       new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
   }
 
   /** Stop this proxy. */
@@ -152,8 +155,9 @@ class AvroRpcEngine implements RpcEngine
     private final ReflectRequestor requestor;
     public Invoker(Class<?> protocol, InetSocketAddress addr,
                    UserGroupInformation ticket, Configuration conf,
-                   SocketFactory factory) throws IOException {
-      this.tx = new ClientTransceiver(addr, ticket, conf, factory);
+                   SocketFactory factory,
+                   int rpcTimeout) throws IOException {
+      this.tx = new ClientTransceiver(addr, ticket, conf, factory, rpcTimeout);
       this.requestor = new ReflectRequestor(protocol, tx);
     }
     @Override public Object invoke(Object proxy, Method method, Object[] args) 
@@ -169,7 +173,7 @@ class AvroRpcEngine implements RpcEngine
   private static class TunnelResponder extends ReflectResponder
     implements TunnelProtocol {
 
-    public TunnelResponder(Class iface, Object impl) {
+    public TunnelResponder(Class<?> iface, Object impl) {
       super(iface, impl);
     }
 
@@ -192,7 +196,7 @@ class AvroRpcEngine implements RpcEngine
 
   /** Construct a server for a protocol implementation instance listening on a
    * port and address. */
-  public RPC.Server getServer(Class iface, Object impl, String bindAddress,
+  public RPC.Server getServer(Class<?> iface, Object impl, String bindAddress,
                               int port, int numHandlers, boolean verbose,
                               Configuration conf, 
                        SecretManager<? extends TokenIdentifier> secretManager

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=982681&r1=982680&r2=982681&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Client.java Thu Aug  5 16:39:10 2010
@@ -219,6 +219,7 @@ public class Client {
     private Socket socket = null;                 // connected socket
     private DataInputStream in;
     private DataOutputStream out;
+    private int rpcTimeout;
     
     // currently active calls
     private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
@@ -233,7 +234,7 @@ public class Client {
         throw new UnknownHostException("unknown host: " + 
                                        remoteId.getAddress().getHostName());
       }
-      
+      this.rpcTimeout = remoteId.getRpcTimeout();
       UserGroupInformation ticket = remoteId.getTicket();
       Class<?> protocol = remoteId.getProtocol();
       this.useSasl = UserGroupInformation.isSecurityEnabled();
@@ -321,11 +322,13 @@ public class Client {
       }
 
       /* Process timeout exception
-       * if the connection is not going to be closed, send a ping.
+       * if the connection is not going to be closed or 
+       * is not configured to have a RPC timeout, send a ping.
+       * (if rpcTimeout is not set to be 0, then RPC should timeout.
        * otherwise, throw the timeout exception.
        */
       private void handleTimeout(SocketTimeoutException e) throws IOException {
-        if (shouldCloseConnection.get() || !running.get()) {
+        if (shouldCloseConnection.get() || !running.get() || rpcTimeout > 0) {
           throw e;
         } else {
           sendPing();
@@ -405,6 +408,9 @@ public class Client {
           this.socket.setTcpNoDelay(tcpNoDelay);
           // connection time out is 20s
           NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
+          if (rpcTimeout > 0) {
+            pingInterval = rpcTimeout;  // rpcTimeout overwrites pingInterval
+          }
           this.socket.setSoTimeout(pingInterval);
           return;
         } catch (SocketTimeoutException toe) {
@@ -952,7 +958,7 @@ public class Client {
   public Writable call(Writable param, InetSocketAddress addr, 
       UserGroupInformation ticket)  
       throws InterruptedException, IOException {
-    return call(param, addr, null, ticket);
+    return call(param, addr, null, ticket, 0);
   }
   
   /** Make a call, passing <code>param</code>, to the IPC server running at
@@ -961,10 +967,12 @@ public class Client {
    * Throws exceptions if there are network problems or if the remote code 
    * threw an exception. */
   public Writable call(Writable param, InetSocketAddress addr, 
-                       Class<?> protocol, UserGroupInformation ticket)  
+                       Class<?> protocol, UserGroupInformation ticket,
+                       int rpcTimeout)  
                        throws InterruptedException, IOException {
     Call call = new Call(param);
-    Connection connection = getConnection(addr, protocol, ticket, call);
+    Connection connection = getConnection(
+        addr, protocol, ticket, rpcTimeout, call);
     connection.sendParam(call);                 // send the parameter
     boolean interrupted = false;
     synchronized (call) {
@@ -1054,7 +1062,7 @@ public class Client {
         ParallelCall call = new ParallelCall(params[i], results, i);
         try {
           Connection connection = 
-            getConnection(addresses[i], protocol, ticket, call);
+            getConnection(addresses[i], protocol, ticket, 0, call);
           connection.sendParam(call);             // send each parameter
         } catch (IOException e) {
           // log errors
@@ -1078,6 +1086,7 @@ public class Client {
   private Connection getConnection(InetSocketAddress addr,
                                    Class<?> protocol,
                                    UserGroupInformation ticket,
+                                   int rpcTimeout,
                                    Call call)
                                    throws IOException, InterruptedException {
     if (!running.get()) {
@@ -1089,7 +1098,8 @@ public class Client {
      * connectionsId object and with set() method. We need to manage the
      * refs for keys in HashMap properly. For now its ok.
      */
-    ConnectionId remoteId = new ConnectionId(addr, protocol, ticket);
+    ConnectionId remoteId = new ConnectionId(
+        addr, protocol, ticket, rpcTimeout);
     do {
       synchronized (connections) {
         connection = connections.get(remoteId);
@@ -1117,12 +1127,14 @@ public class Client {
     UserGroupInformation ticket;
     Class<?> protocol;
     private static final int PRIME = 16777619;
+    private int rpcTimeout;
     
     ConnectionId(InetSocketAddress address, Class<?> protocol, 
-                 UserGroupInformation ticket) {
+                 UserGroupInformation ticket, int rpcTimeout) {
       this.protocol = protocol;
       this.address = address;
       this.ticket = ticket;
+      this.rpcTimeout = rpcTimeout;
     }
     
     InetSocketAddress getAddress() {
@@ -1137,6 +1149,9 @@ public class Client {
       return ticket;
     }
     
+    private int getRpcTimeout() {
+      return rpcTimeout;
+    }
     
     @Override
     public boolean equals(Object obj) {
@@ -1144,15 +1159,19 @@ public class Client {
        ConnectionId id = (ConnectionId) obj;
        return address.equals(id.address) && protocol == id.protocol && 
               ((ticket != null && ticket.equals(id.ticket)) ||
-               (ticket == id.ticket));
+               (ticket == id.ticket)) && rpcTimeout == id.rpcTimeout;
      }
      return false;
     }
     
-    @Override
+    @Override  // simply use the default Object#hashcode() ?
     public int hashCode() {
-      return (address.hashCode() + PRIME * System.identityHashCode(protocol)) ^ 
-             (ticket == null ? 0 : ticket.hashCode());
+      return (address.hashCode() + PRIME * (
+                PRIME * (
+                  PRIME * System.identityHashCode(protocol) ^
+                  System.identityHashCode(ticket)
+                ) ^ System.identityHashCode(rpcTimeout)
+              ));
     }
   }  
 }

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java?rev=982681&r1=982680&r2=982681&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java Thu Aug  5 16:39:10 2010
@@ -156,7 +156,7 @@ public class RPC {
   }
   
   public static Object waitForProxy(
-      Class protocol,
+      Class<?> protocol,
       long clientVersion,
       InetSocketAddress addr,
       Configuration conf
@@ -170,18 +170,37 @@ public class RPC {
    * @param clientVersion client version
    * @param addr remote address
    * @param conf configuration to use
-   * @param timeout time in milliseconds before giving up
+   * @param connTimeout time in milliseconds before giving up
    * @return the proxy
    * @throws IOException if the far end through a RemoteException
    */
-  public static Object waitForProxy(Class protocol, long clientVersion,
+  public static Object waitForProxy(Class<?> protocol, long clientVersion,
                              InetSocketAddress addr, Configuration conf,
-                             long timeout) throws IOException { 
+                             long connTimeout) throws IOException { 
+    return waitForProxy(protocol, clientVersion, addr, conf, 0, connTimeout);
+  }
+    /**
+     * Get a proxy connection to a remote server
+     * @param protocol protocol class
+     * @param clientVersion client version
+     * @param addr remote address
+     * @param conf configuration to use
+     * @param rpcTimeout timeout for each RPC
+     * @param timeout time in milliseconds before giving up
+     * @return the proxy
+     * @throws IOException if the far end through a RemoteException
+     */
+    public static Object waitForProxy(Class<?> protocol, long clientVersion,
+                               InetSocketAddress addr, Configuration conf,
+                               int rpcTimeout,
+                               long timeout) throws IOException { 
     long startTime = System.currentTimeMillis();
     IOException ioe;
     while (true) {
       try {
-        return getProxy(protocol, clientVersion, addr, conf);
+        return getProxy(protocol, clientVersion, addr, 
+            UserGroupInformation.getCurrentUser(), conf, NetUtils
+            .getDefaultSocketFactory(conf), rpcTimeout);
       } catch(ConnectException se) {  // namenode has not been started
         LOG.info("Server at " + addr + " not available yet, Zzzzz...");
         ioe = se;
@@ -208,7 +227,7 @@ public class RPC {
 
   /** Construct a client-side proxy object that implements the named protocol,
    * talking to a server at the named address. */
-  public static Object getProxy(Class protocol, long clientVersion,
+  public static Object getProxy(Class<?> protocol, long clientVersion,
                                 InetSocketAddress addr, Configuration conf,
                                 SocketFactory factory) throws IOException {
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
@@ -217,16 +236,39 @@ public class RPC {
   
   /** Construct a client-side proxy object that implements the named protocol,
    * talking to a server at the named address. */
-  public static Object getProxy(Class protocol, long clientVersion,
+  public static Object getProxy(Class<?> protocol, long clientVersion,
+                                InetSocketAddress addr,
+                                UserGroupInformation ticket,
+                                Configuration conf,
+                                SocketFactory factory) throws IOException {
+    return getProxy(protocol, clientVersion, addr, ticket, conf, factory, 0);
+  }
+  
+  /**
+   * Construct a client-side proxy that implements the named protocol,
+   * talking to a server at the named address.
+   * 
+   * @param protocol protocol
+   * @param clientVersion client's version
+   * @param addr server address
+   * @param ticket security ticket
+   * @param conf configuration
+   * @param factory socket factory
+   * @param rpcTimeout max time for each rpc; 0 means no timeout
+   * @return the proxy
+   * @throws IOException if any error occurs
+   */
+  public static Object getProxy(Class<?> protocol, long clientVersion,
                                 InetSocketAddress addr,
                                 UserGroupInformation ticket,
                                 Configuration conf,
-                                SocketFactory factory) throws IOException {    
+                                SocketFactory factory,
+                                int rpcTimeout) throws IOException {    
     if (UserGroupInformation.isSecurityEnabled()) {
       SaslRpcServer.init(conf);
     }
-    return getProtocolEngine(protocol,conf)
-      .getProxy(protocol, clientVersion, addr, ticket, conf, factory);
+    return getProtocolEngine(protocol,conf).getProxy(protocol,
+        clientVersion, addr, ticket, conf, factory, rpcTimeout);
   }
 
   /**
@@ -239,7 +281,7 @@ public class RPC {
    * @return a proxy instance
    * @throws IOException
    */
-  public static Object getProxy(Class protocol, long clientVersion,
+  public static Object getProxy(Class<?> protocol, long clientVersion,
                                 InetSocketAddress addr, Configuration conf)
     throws IOException {
 

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java?rev=982681&r1=982680&r2=982681&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java Thu Aug  5 16:39:10 2010
@@ -32,10 +32,10 @@ import org.apache.hadoop.conf.Configurat
 interface RpcEngine {
 
   /** Construct a client-side proxy object. */
-  Object getProxy(Class protocol,
+  Object getProxy(Class<?> protocol,
                   long clientVersion, InetSocketAddress addr,
                   UserGroupInformation ticket, Configuration conf,
-                  SocketFactory factory) throws IOException;
+                  SocketFactory factory, int rpcTimeout) throws IOException;
 
   /** Stop this proxy. */
   void stopProxy(Object proxy);
@@ -46,7 +46,7 @@ interface RpcEngine {
     throws IOException, InterruptedException;
 
   /** Construct a server for a protocol implementation instance. */
-  RPC.Server getServer(Class protocol, Object instance, String bindAddress,
+  RPC.Server getServer(Class<?> protocol, Object instance, String bindAddress,
                        int port, int numHandlers, boolean verbose,
                        Configuration conf, 
                        SecretManager<? extends TokenIdentifier> secretManager

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=982681&r1=982680&r2=982681&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java Thu Aug  5 16:39:10 2010
@@ -48,7 +48,7 @@ class WritableRpcEngine implements RpcEn
   /** A method invocation, including the method name and its parameters.*/
   private static class Invocation implements Writable, Configurable {
     private String methodName;
-    private Class[] parameterClasses;
+    private Class<?>[] parameterClasses;
     private Object[] parameters;
     private Configuration conf;
 
@@ -64,7 +64,7 @@ class WritableRpcEngine implements RpcEn
     public String getMethodName() { return methodName; }
 
     /** The parameter classes. */
-    public Class[] getParameterClasses() { return parameterClasses; }
+    public Class<?>[] getParameterClasses() { return parameterClasses; }
 
     /** The parameter instances. */
     public Object[] getParameters() { return parameters; }
@@ -172,18 +172,21 @@ class WritableRpcEngine implements RpcEn
   private static ClientCache CLIENTS=new ClientCache();
   
   private static class Invoker implements InvocationHandler {
-    private Class protocol;
+    private Class<?> protocol;
     private InetSocketAddress address;
     private UserGroupInformation ticket;
+    private int rpcTimeout;
     private Client client;
     private boolean isClosed = false;
 
-    public Invoker(Class protocol,
+    public Invoker(Class<?> protocol,
                    InetSocketAddress address, UserGroupInformation ticket,
-                   Configuration conf, SocketFactory factory) {
+                   Configuration conf, SocketFactory factory,
+                   int rpcTimeout) {
       this.protocol = protocol;
       this.address = address;
       this.ticket = ticket;
+      this.rpcTimeout = rpcTimeout;
       this.client = CLIENTS.getClient(conf, factory);
     }
 
@@ -197,7 +200,7 @@ class WritableRpcEngine implements RpcEn
 
       ObjectWritable value = (ObjectWritable)
         client.call(new Invocation(method, args), address, 
-                    protocol, ticket);
+                    protocol, ticket, rpcTimeout);
       if (logDebug) {
         long callTime = System.currentTimeMillis() - startTime;
         LOG.debug("Call: " + method.getName() + " " + callTime);
@@ -216,14 +219,15 @@ class WritableRpcEngine implements RpcEn
   
   /** Construct a client-side proxy object that implements the named protocol,
    * talking to a server at the named address. */
-  public Object getProxy(Class protocol, long clientVersion,
+  public Object getProxy(Class<?> protocol, long clientVersion,
                          InetSocketAddress addr, UserGroupInformation ticket,
-                         Configuration conf, SocketFactory factory)
+                         Configuration conf, SocketFactory factory,
+                         int rpcTimeout)
     throws IOException {    
 
     Object proxy = Proxy.newProxyInstance
       (protocol.getClassLoader(), new Class[] { protocol },
-       new Invoker(protocol, addr, ticket, conf, factory));
+       new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
     if (proxy instanceof VersionedProtocol) {
       long serverVersion = ((VersionedProtocol)proxy)
         .getProtocolVersion(protocol.getName(), clientVersion);
@@ -276,7 +280,7 @@ class WritableRpcEngine implements RpcEn
 
   /** Construct a server for a protocol implementation instance listening on a
    * port and address. */
-  public Server getServer(Class protocol,
+  public Server getServer(Class<?> protocol,
                           Object instance, String bindAddress, int port,
                           int numHandlers, boolean verbose, Configuration conf,
                       SecretManager<? extends TokenIdentifier> secretManager) 

Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestIPC.java?rev=982681&r1=982680&r2=982681&view=diff
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestIPC.java (original)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestIPC.java Thu Aug  5 16:39:10 2010
@@ -29,6 +29,7 @@ import java.util.Random;
 import java.io.DataInput;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
 import javax.net.SocketFactory;
 
 import junit.framework.TestCase;
@@ -43,6 +44,7 @@ public class TestIPC extends TestCase {
   
   final private static Configuration conf = new Configuration();
   final static private int PING_INTERVAL = 1000;
+  final static private int MIN_SLEEP_TIME = 1000;
   
   static {
     Client.setPingInterval(conf, PING_INTERVAL);
@@ -66,8 +68,9 @@ public class TestIPC extends TestCase {
     public Writable call(Class<?> protocol, Writable param, long receiveTime)
         throws IOException {
       if (sleep) {
+        // sleep a bit
         try {
-          Thread.sleep(RANDOM.nextInt(2*PING_INTERVAL));      // sleep a bit
+          Thread.sleep(RANDOM.nextInt(PING_INTERVAL) + MIN_SLEEP_TIME);
         } catch (InterruptedException e) {}
       }
       return param;                               // echo param as result
@@ -91,7 +94,7 @@ public class TestIPC extends TestCase {
         try {
           LongWritable param = new LongWritable(RANDOM.nextLong());
           LongWritable value =
-            (LongWritable)client.call(param, server, null, null);
+            (LongWritable)client.call(param, server, null, null, 0);
           if (!param.equals(value)) {
             LOG.fatal("Call failed!");
             failed = true;
@@ -142,6 +145,7 @@ public class TestIPC extends TestCase {
 
   public void testSerial() throws Exception {
     testSerial(3, false, 2, 5, 100);
+    testSerial(3, true, 2, 5, 10);
   }
 
   public void testSerial(int handlerCount, boolean handlerSleep, 
@@ -219,7 +223,7 @@ public class TestIPC extends TestCase {
     InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10);
     try {
       client.call(new LongWritable(RANDOM.nextLong()),
-              address, null, null);
+              address, null, null, 0);
       fail("Expected an exception to have been thrown");
     } catch (IOException e) {
       String message = e.getMessage();
@@ -276,7 +280,7 @@ public class TestIPC extends TestCase {
     Client client = new Client(LongErrorWritable.class, conf);
     try {
       client.call(new LongErrorWritable(RANDOM.nextLong()),
-              addr, null, null);
+              addr, null, null, 0);
       fail("Expected an exception to have been thrown");
     } catch (IOException e) {
       // check error
@@ -296,7 +300,7 @@ public class TestIPC extends TestCase {
     Client client = new Client(LongRTEWritable.class, conf);
     try {
       client.call(new LongRTEWritable(RANDOM.nextLong()),
-              addr, null, null);
+              addr, null, null, 0);
       fail("Expected an exception to have been thrown");
     } catch (IOException e) {
       // check error
@@ -322,14 +326,34 @@ public class TestIPC extends TestCase {
     InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10);
     try {
       client.call(new LongWritable(RANDOM.nextLong()),
-              address, null, null);
+              address, null, null, 0);
       fail("Expected an exception to have been thrown");
     } catch (IOException e) {
       assertTrue(e.getMessage().contains("Injected fault"));
     }
   }
 
+  public void testIpcTimeout() throws Exception {
+    // start server
+    Server server = new TestServer(1, true);
+    InetSocketAddress addr = NetUtils.getConnectAddress(server);
+    server.start();
 
+    // start client
+    Client client = new Client(LongWritable.class, conf);
+    // set timeout to be less than MIN_SLEEP_TIME
+    try {
+      client.call(new LongWritable(RANDOM.nextLong()),
+              addr, null, null, MIN_SLEEP_TIME/2);
+      fail("Expected an exception to have been thrown");
+    } catch (SocketTimeoutException e) {
+      LOG.info("Get a SocketTimeoutException ", e);
+    }
+    // set timeout to be bigger than 3*ping interval
+    client.call(new LongWritable(RANDOM.nextLong()),
+        addr, null, null, 3*PING_INTERVAL+MIN_SLEEP_TIME);
+  }
+  
   public static void main(String[] args) throws Exception {
 
     //new TestIPC("test").testSerial(5, false, 2, 10, 1000);