You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2010/08/05 18:39:11 UTC
svn commit: r982681 - in /hadoop/common/trunk: ./
src/java/org/apache/hadoop/ipc/ src/test/core/org/apache/hadoop/ipc/
Author: hairong
Date: Thu Aug 5 16:39:10 2010
New Revision: 982681
URL: http://svn.apache.org/viewvc?rev=982681&view=rev
Log:
HADOOP-6889. Make RPC to have an option to timeout. Contributed by Hairong Kuang.
Modified:
hadoop/common/trunk/CHANGES.txt
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Client.java
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java
hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestIPC.java
Modified: hadoop/common/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=982681&r1=982680&r2=982681&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Thu Aug 5 16:39:10 2010
@@ -33,6 +33,8 @@ Trunk (unreleased changes)
HADOOP-6892. Common component of HDFS-1150 (Verify datanodes' identities
to clients in secure clusters) (jghoman)
+ HADOOP-6889. Make RPC to have an option to timeout. (hairong)
+
IMPROVEMENTS
HADOOP-6644. util.Shell getGROUPS_FOR_USER_COMMAND method name
@@ -105,6 +107,7 @@ Trunk (unreleased changes)
periodically. (Owen O'Malley and ddas via ddas)
HADOOP-6890. Improve listFiles API introduced by HADOOP-6870. (hairong)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java?rev=982681&r1=982680&r2=982681&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java Thu Aug 5 16:39:10 2010
@@ -98,11 +98,13 @@ class AvroRpcEngine implements RpcEngine
public ClientTransceiver(InetSocketAddress addr,
UserGroupInformation ticket,
- Configuration conf, SocketFactory factory)
+ Configuration conf, SocketFactory factory,
+ int rpcTimeout)
throws IOException {
this.tunnel =
(TunnelProtocol)ENGINE.getProxy(TunnelProtocol.class, VERSION,
- addr, ticket, conf, factory);
+ addr, ticket, conf, factory,
+ rpcTimeout);
this.remote = addr;
}
@@ -128,14 +130,15 @@ class AvroRpcEngine implements RpcEngine
/** Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address. */
- public Object getProxy(Class protocol, long clientVersion,
+ public Object getProxy(Class<?> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket,
- Configuration conf, SocketFactory factory)
+ Configuration conf, SocketFactory factory,
+ int rpcTimeout)
throws IOException {
return Proxy.newProxyInstance
(protocol.getClassLoader(),
new Class[] { protocol },
- new Invoker(protocol, addr, ticket, conf, factory));
+ new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
}
/** Stop this proxy. */
@@ -152,8 +155,9 @@ class AvroRpcEngine implements RpcEngine
private final ReflectRequestor requestor;
public Invoker(Class<?> protocol, InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf,
- SocketFactory factory) throws IOException {
- this.tx = new ClientTransceiver(addr, ticket, conf, factory);
+ SocketFactory factory,
+ int rpcTimeout) throws IOException {
+ this.tx = new ClientTransceiver(addr, ticket, conf, factory, rpcTimeout);
this.requestor = new ReflectRequestor(protocol, tx);
}
@Override public Object invoke(Object proxy, Method method, Object[] args)
@@ -169,7 +173,7 @@ class AvroRpcEngine implements RpcEngine
private static class TunnelResponder extends ReflectResponder
implements TunnelProtocol {
- public TunnelResponder(Class iface, Object impl) {
+ public TunnelResponder(Class<?> iface, Object impl) {
super(iface, impl);
}
@@ -192,7 +196,7 @@ class AvroRpcEngine implements RpcEngine
/** Construct a server for a protocol implementation instance listening on a
* port and address. */
- public RPC.Server getServer(Class iface, Object impl, String bindAddress,
+ public RPC.Server getServer(Class<?> iface, Object impl, String bindAddress,
int port, int numHandlers, boolean verbose,
Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=982681&r1=982680&r2=982681&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Client.java Thu Aug 5 16:39:10 2010
@@ -219,6 +219,7 @@ public class Client {
private Socket socket = null; // connected socket
private DataInputStream in;
private DataOutputStream out;
+ private int rpcTimeout;
// currently active calls
private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
@@ -233,7 +234,7 @@ public class Client {
throw new UnknownHostException("unknown host: " +
remoteId.getAddress().getHostName());
}
-
+ this.rpcTimeout = remoteId.getRpcTimeout();
UserGroupInformation ticket = remoteId.getTicket();
Class<?> protocol = remoteId.getProtocol();
this.useSasl = UserGroupInformation.isSecurityEnabled();
@@ -321,11 +322,13 @@ public class Client {
}
/* Process timeout exception
- * if the connection is not going to be closed, send a ping.
+ * if the connection is not going to be closed or
+ * is not configured to have a RPC timeout, send a ping.
+ * (if rpcTimeout is not set to be 0, then RPC should timeout.
* otherwise, throw the timeout exception.
*/
private void handleTimeout(SocketTimeoutException e) throws IOException {
- if (shouldCloseConnection.get() || !running.get()) {
+ if (shouldCloseConnection.get() || !running.get() || rpcTimeout > 0) {
throw e;
} else {
sendPing();
@@ -405,6 +408,9 @@ public class Client {
this.socket.setTcpNoDelay(tcpNoDelay);
// connection time out is 20s
NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
+ if (rpcTimeout > 0) {
+ pingInterval = rpcTimeout; // rpcTimeout overwrites pingInterval
+ }
this.socket.setSoTimeout(pingInterval);
return;
} catch (SocketTimeoutException toe) {
@@ -952,7 +958,7 @@ public class Client {
public Writable call(Writable param, InetSocketAddress addr,
UserGroupInformation ticket)
throws InterruptedException, IOException {
- return call(param, addr, null, ticket);
+ return call(param, addr, null, ticket, 0);
}
/** Make a call, passing <code>param</code>, to the IPC server running at
@@ -961,10 +967,12 @@ public class Client {
* Throws exceptions if there are network problems or if the remote code
* threw an exception. */
public Writable call(Writable param, InetSocketAddress addr,
- Class<?> protocol, UserGroupInformation ticket)
+ Class<?> protocol, UserGroupInformation ticket,
+ int rpcTimeout)
throws InterruptedException, IOException {
Call call = new Call(param);
- Connection connection = getConnection(addr, protocol, ticket, call);
+ Connection connection = getConnection(
+ addr, protocol, ticket, rpcTimeout, call);
connection.sendParam(call); // send the parameter
boolean interrupted = false;
synchronized (call) {
@@ -1054,7 +1062,7 @@ public class Client {
ParallelCall call = new ParallelCall(params[i], results, i);
try {
Connection connection =
- getConnection(addresses[i], protocol, ticket, call);
+ getConnection(addresses[i], protocol, ticket, 0, call);
connection.sendParam(call); // send each parameter
} catch (IOException e) {
// log errors
@@ -1078,6 +1086,7 @@ public class Client {
private Connection getConnection(InetSocketAddress addr,
Class<?> protocol,
UserGroupInformation ticket,
+ int rpcTimeout,
Call call)
throws IOException, InterruptedException {
if (!running.get()) {
@@ -1089,7 +1098,8 @@ public class Client {
* connectionsId object and with set() method. We need to manage the
* refs for keys in HashMap properly. For now its ok.
*/
- ConnectionId remoteId = new ConnectionId(addr, protocol, ticket);
+ ConnectionId remoteId = new ConnectionId(
+ addr, protocol, ticket, rpcTimeout);
do {
synchronized (connections) {
connection = connections.get(remoteId);
@@ -1117,12 +1127,14 @@ public class Client {
UserGroupInformation ticket;
Class<?> protocol;
private static final int PRIME = 16777619;
+ private int rpcTimeout;
ConnectionId(InetSocketAddress address, Class<?> protocol,
- UserGroupInformation ticket) {
+ UserGroupInformation ticket, int rpcTimeout) {
this.protocol = protocol;
this.address = address;
this.ticket = ticket;
+ this.rpcTimeout = rpcTimeout;
}
InetSocketAddress getAddress() {
@@ -1137,6 +1149,9 @@ public class Client {
return ticket;
}
+ private int getRpcTimeout() {
+ return rpcTimeout;
+ }
@Override
public boolean equals(Object obj) {
@@ -1144,15 +1159,19 @@ public class Client {
ConnectionId id = (ConnectionId) obj;
return address.equals(id.address) && protocol == id.protocol &&
((ticket != null && ticket.equals(id.ticket)) ||
- (ticket == id.ticket));
+ (ticket == id.ticket)) && rpcTimeout == id.rpcTimeout;
}
return false;
}
- @Override
+ @Override // simply use the default Object#hashcode() ?
public int hashCode() {
- return (address.hashCode() + PRIME * System.identityHashCode(protocol)) ^
- (ticket == null ? 0 : ticket.hashCode());
+ return (address.hashCode() + PRIME * (
+ PRIME * (
+ PRIME * System.identityHashCode(protocol) ^
+ System.identityHashCode(ticket)
+ ) ^ System.identityHashCode(rpcTimeout)
+ ));
}
}
}
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java?rev=982681&r1=982680&r2=982681&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java Thu Aug 5 16:39:10 2010
@@ -156,7 +156,7 @@ public class RPC {
}
public static Object waitForProxy(
- Class protocol,
+ Class<?> protocol,
long clientVersion,
InetSocketAddress addr,
Configuration conf
@@ -170,18 +170,37 @@ public class RPC {
* @param clientVersion client version
* @param addr remote address
* @param conf configuration to use
- * @param timeout time in milliseconds before giving up
+ * @param connTimeout time in milliseconds before giving up
* @return the proxy
* @throws IOException if the far end through a RemoteException
*/
- public static Object waitForProxy(Class protocol, long clientVersion,
+ public static Object waitForProxy(Class<?> protocol, long clientVersion,
InetSocketAddress addr, Configuration conf,
- long timeout) throws IOException {
+ long connTimeout) throws IOException {
+ return waitForProxy(protocol, clientVersion, addr, conf, 0, connTimeout);
+ }
+ /**
+ * Get a proxy connection to a remote server
+ * @param protocol protocol class
+ * @param clientVersion client version
+ * @param addr remote address
+ * @param conf configuration to use
+ * @param rpcTimeout timeout for each RPC
+ * @param timeout time in milliseconds before giving up
+ * @return the proxy
+ * @throws IOException if the far end through a RemoteException
+ */
+ public static Object waitForProxy(Class<?> protocol, long clientVersion,
+ InetSocketAddress addr, Configuration conf,
+ int rpcTimeout,
+ long timeout) throws IOException {
long startTime = System.currentTimeMillis();
IOException ioe;
while (true) {
try {
- return getProxy(protocol, clientVersion, addr, conf);
+ return getProxy(protocol, clientVersion, addr,
+ UserGroupInformation.getCurrentUser(), conf, NetUtils
+ .getDefaultSocketFactory(conf), rpcTimeout);
} catch(ConnectException se) { // namenode has not been started
LOG.info("Server at " + addr + " not available yet, Zzzzz...");
ioe = se;
@@ -208,7 +227,7 @@ public class RPC {
/** Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address. */
- public static Object getProxy(Class protocol, long clientVersion,
+ public static Object getProxy(Class<?> protocol, long clientVersion,
InetSocketAddress addr, Configuration conf,
SocketFactory factory) throws IOException {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
@@ -217,16 +236,39 @@ public class RPC {
/** Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address. */
- public static Object getProxy(Class protocol, long clientVersion,
+ public static Object getProxy(Class<?> protocol, long clientVersion,
+ InetSocketAddress addr,
+ UserGroupInformation ticket,
+ Configuration conf,
+ SocketFactory factory) throws IOException {
+ return getProxy(protocol, clientVersion, addr, ticket, conf, factory, 0);
+ }
+
+ /**
+ * Construct a client-side proxy that implements the named protocol,
+ * talking to a server at the named address.
+ *
+ * @param protocol protocol
+ * @param clientVersion client's version
+ * @param addr server address
+ * @param ticket security ticket
+ * @param conf configuration
+ * @param factory socket factory
+ * @param rpcTimeout max time for each rpc; 0 means no timeout
+ * @return the proxy
+ * @throws IOException if any error occurs
+ */
+ public static Object getProxy(Class<?> protocol, long clientVersion,
InetSocketAddress addr,
UserGroupInformation ticket,
Configuration conf,
- SocketFactory factory) throws IOException {
+ SocketFactory factory,
+ int rpcTimeout) throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
SaslRpcServer.init(conf);
}
- return getProtocolEngine(protocol,conf)
- .getProxy(protocol, clientVersion, addr, ticket, conf, factory);
+ return getProtocolEngine(protocol,conf).getProxy(protocol,
+ clientVersion, addr, ticket, conf, factory, rpcTimeout);
}
/**
@@ -239,7 +281,7 @@ public class RPC {
* @return a proxy instance
* @throws IOException
*/
- public static Object getProxy(Class protocol, long clientVersion,
+ public static Object getProxy(Class<?> protocol, long clientVersion,
InetSocketAddress addr, Configuration conf)
throws IOException {
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java?rev=982681&r1=982680&r2=982681&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java Thu Aug 5 16:39:10 2010
@@ -32,10 +32,10 @@ import org.apache.hadoop.conf.Configurat
interface RpcEngine {
/** Construct a client-side proxy object. */
- Object getProxy(Class protocol,
+ Object getProxy(Class<?> protocol,
long clientVersion, InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf,
- SocketFactory factory) throws IOException;
+ SocketFactory factory, int rpcTimeout) throws IOException;
/** Stop this proxy. */
void stopProxy(Object proxy);
@@ -46,7 +46,7 @@ interface RpcEngine {
throws IOException, InterruptedException;
/** Construct a server for a protocol implementation instance. */
- RPC.Server getServer(Class protocol, Object instance, String bindAddress,
+ RPC.Server getServer(Class<?> protocol, Object instance, String bindAddress,
int port, int numHandlers, boolean verbose,
Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=982681&r1=982680&r2=982681&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java Thu Aug 5 16:39:10 2010
@@ -48,7 +48,7 @@ class WritableRpcEngine implements RpcEn
/** A method invocation, including the method name and its parameters.*/
private static class Invocation implements Writable, Configurable {
private String methodName;
- private Class[] parameterClasses;
+ private Class<?>[] parameterClasses;
private Object[] parameters;
private Configuration conf;
@@ -64,7 +64,7 @@ class WritableRpcEngine implements RpcEn
public String getMethodName() { return methodName; }
/** The parameter classes. */
- public Class[] getParameterClasses() { return parameterClasses; }
+ public Class<?>[] getParameterClasses() { return parameterClasses; }
/** The parameter instances. */
public Object[] getParameters() { return parameters; }
@@ -172,18 +172,21 @@ class WritableRpcEngine implements RpcEn
private static ClientCache CLIENTS=new ClientCache();
private static class Invoker implements InvocationHandler {
- private Class protocol;
+ private Class<?> protocol;
private InetSocketAddress address;
private UserGroupInformation ticket;
+ private int rpcTimeout;
private Client client;
private boolean isClosed = false;
- public Invoker(Class protocol,
+ public Invoker(Class<?> protocol,
InetSocketAddress address, UserGroupInformation ticket,
- Configuration conf, SocketFactory factory) {
+ Configuration conf, SocketFactory factory,
+ int rpcTimeout) {
this.protocol = protocol;
this.address = address;
this.ticket = ticket;
+ this.rpcTimeout = rpcTimeout;
this.client = CLIENTS.getClient(conf, factory);
}
@@ -197,7 +200,7 @@ class WritableRpcEngine implements RpcEn
ObjectWritable value = (ObjectWritable)
client.call(new Invocation(method, args), address,
- protocol, ticket);
+ protocol, ticket, rpcTimeout);
if (logDebug) {
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
@@ -216,14 +219,15 @@ class WritableRpcEngine implements RpcEn
/** Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address. */
- public Object getProxy(Class protocol, long clientVersion,
+ public Object getProxy(Class<?> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket,
- Configuration conf, SocketFactory factory)
+ Configuration conf, SocketFactory factory,
+ int rpcTimeout)
throws IOException {
Object proxy = Proxy.newProxyInstance
(protocol.getClassLoader(), new Class[] { protocol },
- new Invoker(protocol, addr, ticket, conf, factory));
+ new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
if (proxy instanceof VersionedProtocol) {
long serverVersion = ((VersionedProtocol)proxy)
.getProtocolVersion(protocol.getName(), clientVersion);
@@ -276,7 +280,7 @@ class WritableRpcEngine implements RpcEn
/** Construct a server for a protocol implementation instance listening on a
* port and address. */
- public Server getServer(Class protocol,
+ public Server getServer(Class<?> protocol,
Object instance, String bindAddress, int port,
int numHandlers, boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager)
Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestIPC.java?rev=982681&r1=982680&r2=982681&view=diff
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestIPC.java (original)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestIPC.java Thu Aug 5 16:39:10 2010
@@ -29,6 +29,7 @@ import java.util.Random;
import java.io.DataInput;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
import javax.net.SocketFactory;
import junit.framework.TestCase;
@@ -43,6 +44,7 @@ public class TestIPC extends TestCase {
final private static Configuration conf = new Configuration();
final static private int PING_INTERVAL = 1000;
+ final static private int MIN_SLEEP_TIME = 1000;
static {
Client.setPingInterval(conf, PING_INTERVAL);
@@ -66,8 +68,9 @@ public class TestIPC extends TestCase {
public Writable call(Class<?> protocol, Writable param, long receiveTime)
throws IOException {
if (sleep) {
+ // sleep a bit
try {
- Thread.sleep(RANDOM.nextInt(2*PING_INTERVAL)); // sleep a bit
+ Thread.sleep(RANDOM.nextInt(PING_INTERVAL) + MIN_SLEEP_TIME);
} catch (InterruptedException e) {}
}
return param; // echo param as result
@@ -91,7 +94,7 @@ public class TestIPC extends TestCase {
try {
LongWritable param = new LongWritable(RANDOM.nextLong());
LongWritable value =
- (LongWritable)client.call(param, server, null, null);
+ (LongWritable)client.call(param, server, null, null, 0);
if (!param.equals(value)) {
LOG.fatal("Call failed!");
failed = true;
@@ -142,6 +145,7 @@ public class TestIPC extends TestCase {
public void testSerial() throws Exception {
testSerial(3, false, 2, 5, 100);
+ testSerial(3, true, 2, 5, 10);
}
public void testSerial(int handlerCount, boolean handlerSleep,
@@ -219,7 +223,7 @@ public class TestIPC extends TestCase {
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10);
try {
client.call(new LongWritable(RANDOM.nextLong()),
- address, null, null);
+ address, null, null, 0);
fail("Expected an exception to have been thrown");
} catch (IOException e) {
String message = e.getMessage();
@@ -276,7 +280,7 @@ public class TestIPC extends TestCase {
Client client = new Client(LongErrorWritable.class, conf);
try {
client.call(new LongErrorWritable(RANDOM.nextLong()),
- addr, null, null);
+ addr, null, null, 0);
fail("Expected an exception to have been thrown");
} catch (IOException e) {
// check error
@@ -296,7 +300,7 @@ public class TestIPC extends TestCase {
Client client = new Client(LongRTEWritable.class, conf);
try {
client.call(new LongRTEWritable(RANDOM.nextLong()),
- addr, null, null);
+ addr, null, null, 0);
fail("Expected an exception to have been thrown");
} catch (IOException e) {
// check error
@@ -322,14 +326,34 @@ public class TestIPC extends TestCase {
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10);
try {
client.call(new LongWritable(RANDOM.nextLong()),
- address, null, null);
+ address, null, null, 0);
fail("Expected an exception to have been thrown");
} catch (IOException e) {
assertTrue(e.getMessage().contains("Injected fault"));
}
}
+ public void testIpcTimeout() throws Exception {
+ // start server
+ Server server = new TestServer(1, true);
+ InetSocketAddress addr = NetUtils.getConnectAddress(server);
+ server.start();
+ // start client
+ Client client = new Client(LongWritable.class, conf);
+ // set timeout to be less than MIN_SLEEP_TIME
+ try {
+ client.call(new LongWritable(RANDOM.nextLong()),
+ addr, null, null, MIN_SLEEP_TIME/2);
+ fail("Expected an exception to have been thrown");
+ } catch (SocketTimeoutException e) {
+ LOG.info("Get a SocketTimeoutException ", e);
+ }
+ // set timeout to be bigger than 3*ping interval
+ client.call(new LongWritable(RANDOM.nextLong()),
+ addr, null, null, 3*PING_INTERVAL+MIN_SLEEP_TIME);
+ }
+
public static void main(String[] args) throws Exception {
//new TestIPC("test").testSerial(5, false, 2, 10, 1000);