You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2008/03/10 19:41:19 UTC
svn commit: r635642 - in /hadoop/core/trunk: ./
src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/ipc/
src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/dfs/
src/test/org/apache/hadoop/ipc/
Author: dhruba
Date: Mon Mar 10 11:41:16 2008
New Revision: 635642
URL: http://svn.apache.org/viewvc?rev=635642&view=rev
Log:
HADOOP-2870. DataNode and NameNode closes all connections while
shutting down. (Hairong Kuang via dhruba)
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java
hadoop/core/trunk/src/java/org/apache/hadoop/ipc/RPC.java
hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeDeath.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestHDFSServerPorts.java
hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=635642&r1=635641&r2=635642&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Mar 10 11:41:16 2008
@@ -157,6 +157,9 @@
HADOOP-2943. Compression of intermediate map output causes failures
in the merge. (cdouglas)
+ HADOOP-2870. DataNode and NameNode closes all connections while
+ shutting down. (Hairong Kuang via dhruba)
+
Release 0.16.1 - 2008-03-13
INCOMPATIBLE CHANGES
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=635642&r1=635641&r2=635642&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Mon Mar 10 11:41:16 2008
@@ -59,7 +59,8 @@
public static final Log LOG = LogFactory.getLog(DFSClient.class);
static final int MAX_BLOCK_ACQUIRE_FAILURES = 3;
private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
- ClientProtocol namenode;
+ final ClientProtocol namenode;
+ final private ClientProtocol rpcNamenode;
final UnixUserGroupInformation ugi;
volatile boolean clientRunning = true;
Random r = new Random();
@@ -77,9 +78,26 @@
*/
private TreeMap<String, OutputStream> pendingCreates =
new TreeMap<String, OutputStream>();
-
- static ClientProtocol createNamenode(
- InetSocketAddress nameNodeAddr, Configuration conf)
+
+ static ClientProtocol createNamenode( InetSocketAddress nameNodeAddr,
+ Configuration conf) throws IOException {
+ try {
+ return createNamenode(createRPCNamenode(nameNodeAddr, conf,
+ UnixUserGroupInformation.login(conf, true)));
+ } catch (LoginException e) {
+ throw (IOException)(new IOException().initCause(e));
+ }
+ }
+
+ private static ClientProtocol createRPCNamenode(InetSocketAddress nameNodeAddr,
+ Configuration conf, UnixUserGroupInformation ugi)
+ throws IOException {
+ return (ClientProtocol)RPC.getProxy(ClientProtocol.class,
+ ClientProtocol.versionID, nameNodeAddr, ugi, conf,
+ NetUtils.getSocketFactory(conf, ClientProtocol.class));
+ }
+
+ private static ClientProtocol createNamenode(ClientProtocol rpcNamenode)
throws IOException {
RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(
5, 200, TimeUnit.MILLISECONDS);
@@ -118,18 +136,8 @@
methodNameToPolicyMap.put("getEditLogSize", methodPolicy);
methodNameToPolicyMap.put("create", methodPolicy);
- UserGroupInformation userInfo;
- try {
- userInfo = UnixUserGroupInformation.login(conf);
- } catch (LoginException e) {
- throw new IOException(e.getMessage());
- }
-
return (ClientProtocol) RetryProxy.create(ClientProtocol.class,
- RPC.getProxy(ClientProtocol.class,
- ClientProtocol.versionID, nameNodeAddr, userInfo, conf,
- NetUtils.getSocketFactory(conf, ClientProtocol.class)),
- methodNameToPolicyMap);
+ rpcNamenode, methodNameToPolicyMap);
}
/**
@@ -141,14 +149,16 @@
this.socketTimeout = conf.getInt("dfs.socket.timeout",
FSConstants.READ_TIMEOUT);
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
-
+
try {
this.ugi = UnixUserGroupInformation.login(conf, true);
} catch (LoginException e) {
throw (IOException)(new IOException().initCause(e));
}
- this.namenode = createNamenode(nameNodeAddr, conf);
+ this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
+ this.namenode = createNamenode(rpcNamenode);
+
String taskId = conf.get("mapred.task.id");
if (taskId != null) {
this.clientName = "DFSClient_" + taskId;
@@ -169,8 +179,8 @@
}
/**
- * Close the file system, abadoning all of the leases and files being
- * created.
+ * Close the file system, abandoning all of the leases and files being
+ * created and close connections to the namenode.
*/
public void close() throws IOException {
// synchronize in here so that we don't need to change the API
@@ -197,6 +207,9 @@
leaseChecker.join();
} catch (InterruptedException ie) {
}
+
+ // close connections to the namenode
+ RPC.stopProxy(rpcNamenode);
}
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=635642&r1=635641&r2=635642&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Mon Mar 10 11:41:16 2008
@@ -488,6 +488,9 @@
}
}
}
+
+ RPC.stopProxy(namenode); // stop the RPC threads
+
if(upgradeManager != null)
upgradeManager.shutdownUpgrade();
if (blockScannerThread != null) {
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=635642&r1=635641&r2=635642&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java Mon Mar 10 11:41:16 2008
@@ -74,8 +74,21 @@
private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
private Thread connectionCullerThread;
private SocketFactory socketFactory; // how to create sockets
- private boolean simulateError = false; // unit tests
-
+
+ private int refCount = 1;
+
+ synchronized void incCount() {
+ refCount++;
+ }
+
+ synchronized void decCount() {
+ refCount--;
+ }
+
+ synchronized boolean isZeroReference() {
+ return refCount==0;
+ }
+
/** A call waiting for a value. */
private class Call {
int id; // call id
@@ -289,7 +302,6 @@
} else {
Writable value = (Writable)ReflectionUtils.newInstance(valueClass, conf);
try {
- waitForEndSimulation();
readingCall = call;
value.readFields(in); // read value
} finally {
@@ -473,15 +485,48 @@
this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf));
}
+ /** Return the socket factory of this client
+ *
+ * @return this client's socket factory
+ */
+ SocketFactory getSocketFactory() {
+ return socketFactory;
+ }
+
/** Stop all threads related to this client. No further calls may be made
* using this client. */
public void stop() {
- LOG.info("Stopping client");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Stopping client");
+ }
+
+ if (running == false) {
+ return;
+ }
running = false;
+
connectionCullerThread.interrupt();
try {
connectionCullerThread.join();
} catch(InterruptedException e) {}
+
+ // close and wake up all connections
+ synchronized (connections) {
+ for (Connection conn : connections.values()) {
+ synchronized (conn) {
+ conn.setCloseConnection();
+ conn.notifyAll();
+ }
+ }
+ }
+
+ // wait until all connections are closed
+ while (!connections.isEmpty()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ }
+ }
}
/** Sets the timeout used for network i/o. */
@@ -614,19 +659,4 @@
return address.hashCode() ^ System.identityHashCode(ticket);
}
}
-
- void simulateError(boolean flag) {
- simulateError = flag;
- }
-
- // If errors are being simulated, then wait.
- private void waitForEndSimulation() {
- while (simulateError) {
- try {
- LOG.info("RPC Client waiting for simulation to end");
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- }
- }
- }
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/ipc/RPC.java?rev=635642&r1=635641&r2=635642&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/ipc/RPC.java Mon Mar 10 11:41:16 2008
@@ -133,63 +133,76 @@
}
- private static Map<SocketFactory, Client> CLIENTS =
+ /* Cache a client using its socket factory as the hash key */
+ static private class ClientCache {
+ private Map<SocketFactory, Client> clients =
new HashMap<SocketFactory, Client>();
- private static synchronized Client getClient(Configuration conf,
- SocketFactory factory) {
- // Construct & cache client. The configuration is only used for timeout,
- // and Clients have connection pools. So we can either (a) lose some
- // connection pooling and leak sockets, or (b) use the same timeout for all
- // configurations. Since the IPC is usually intended globally, not
- // per-job, we choose (a).
- Client client = CLIENTS.get(factory);
- if (client == null) {
- client = new Client(ObjectWritable.class, conf, factory);
- CLIENTS.put(factory, client);
+ /**
+ * Construct & cache an IPC client with the user-provided SocketFactory
+ * if no cached client exists.
+ *
+ * @param conf Configuration
+ * @return an IPC client
+ */
+ private synchronized Client getClient(Configuration conf,
+ SocketFactory factory) {
+ // Construct & cache client. The configuration is only used for timeout,
+ // and Clients have connection pools. So we can either (a) lose some
+ // connection pooling and leak sockets, or (b) use the same timeout for all
+ // configurations. Since the IPC is usually intended globally, not
+ // per-job, we choose (a).
+ Client client = clients.get(factory);
+ if (client == null) {
+ client = new Client(ObjectWritable.class, conf, factory);
+ clients.put(factory, client);
+ } else {
+ client.incCount();
+ }
+ return client;
}
- return client;
- }
-
- /**
- * Construct & cache client with the default SocketFactory.
- * @param conf
- * @return
- */
- private static Client getClient(Configuration conf) {
- return getClient(conf, SocketFactory.getDefault());
- }
- /**
- * Stop all RPC client connections
- */
- public static synchronized void stopClient(){
- for (Client client : CLIENTS.values())
- client.stop();
- CLIENTS.clear();
- }
-
- /*
- * remove specified client from the list of clients.
- */
- static synchronized void removeClients() {
- CLIENTS.clear();
- }
+ /**
+ * Construct & cache an IPC client with the default SocketFactory
+ * if no cached client exists.
+ *
+ * @param conf Configuration
+ * @return an IPC client
+ */
+ private synchronized Client getClient(Configuration conf) {
+ return getClient(conf, SocketFactory.getDefault());
+ }
- static synchronized Collection allClients() {
- return CLIENTS.values();
+ /**
+ * Stop a RPC client connection
+ * A RPC client is closed only when its reference count becomes zero.
+ */
+ private void stopClient(Client client) {
+ synchronized (this) {
+ client.decCount();
+ if (client.isZeroReference()) {
+ clients.remove(client.getSocketFactory());
+ }
+ }
+ if (client.isZeroReference()) {
+ client.stop();
+ }
+ }
}
+ private static ClientCache CLIENTS=new ClientCache();
+
private static class Invoker implements InvocationHandler {
private InetSocketAddress address;
private UserGroupInformation ticket;
private Client client;
+ private boolean isClosed = false;
public Invoker(InetSocketAddress address, UserGroupInformation ticket,
Configuration conf, SocketFactory factory) {
this.address = address;
this.ticket = ticket;
- this.client = getClient(conf, factory);
+ this.client = CLIENTS.getClient(conf, factory);
}
public Object invoke(Object proxy, Method method, Object[] args)
@@ -201,6 +214,14 @@
LOG.debug("Call: " + method.getName() + " " + callTime);
return value.get();
}
+
+ /* close the IPC client that's responsible for this invoker's RPCs */
+ synchronized private void close() {
+ if (!isClosed) {
+ isClosed = true;
+ CLIENTS.stopClient(client);
+ }
+ }
}
/**
@@ -236,7 +257,7 @@
}
/**
- * Get the client's prefered version
+ * Get the client's preferred version
*/
public long getClientVersion() {
return clientVersion;
@@ -316,6 +337,16 @@
.getDefaultSocketFactory(conf));
}
+ /**
+ * Stop this proxy and release its invoker's resource
+ * @param proxy the proxy to be stopped
+ */
+ public static void stopProxy(VersionedProtocol proxy) {
+ if (proxy!=null) {
+ ((Invoker)Proxy.getInvocationHandler(proxy)).close();
+ }
+ }
+
/** Expert: Make multiple, parallel calls to a set of servers. */
public static Object[] call(Method method, Object[][] params,
InetSocketAddress[] addrs, Configuration conf)
@@ -324,7 +355,9 @@
Invocation[] invocations = new Invocation[params.length];
for (int i = 0; i < params.length; i++)
invocations[i] = new Invocation(method, params[i]);
- Writable[] wrappedValues = getClient(conf).call(invocations, addrs);
+ Client client = CLIENTS.getClient(conf);
+ try {
+ Writable[] wrappedValues = client.call(invocations, addrs);
if (method.getReturnType() == Void.TYPE) {
return null;
@@ -337,6 +370,9 @@
values[i] = ((ObjectWritable)wrappedValues[i]).get();
return values;
+ } finally {
+ CLIENTS.stopClient(client);
+ }
}
/** Construct a server for a protocol implementation instance listening on a
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=635642&r1=635641&r2=635642&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java Mon Mar 10 11:41:16 2008
@@ -345,7 +345,11 @@
selector= null;
acceptChannel= null;
- connectionList = null;
+
+ // clean up all connections
+ while (!connectionList.isEmpty()) {
+ closeConnection(connectionList.remove(0));
+ }
}
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?rev=635642&r1=635641&r2=635642&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Mon Mar 10 11:41:16 2008
@@ -317,6 +317,8 @@
}
JobSubmissionProtocol jobSubmitClient;
+ private JobSubmissionProtocol rpcProxy;
+
FileSystem fs = null;
static Random r = new Random();
@@ -349,10 +351,17 @@
if ("local".equals(tracker)) {
this.jobSubmitClient = new LocalJobRunner(conf);
} else {
- this.jobSubmitClient = createProxy(JobTracker.getAddress(conf), conf);
+ this.rpcProxy = createRPCProxy(JobTracker.getAddress(conf), conf);
+ this.jobSubmitClient = createRetryProxy(this.rpcProxy);
}
}
+ private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
+ Configuration conf) throws IOException {
+ return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
+ JobSubmissionProtocol.versionID, addr, conf,
+ NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
+ }
/**
* Create a proxy JobSubmissionProtocol that retries timeouts.
*
@@ -361,13 +370,8 @@
* @return a proxy object that will retry timeouts.
* @throws IOException
*/
- private JobSubmissionProtocol createProxy(InetSocketAddress addr,
- Configuration conf
+ private JobSubmissionProtocol createRetryProxy(JobSubmissionProtocol raw
) throws IOException {
- JobSubmissionProtocol raw =
- (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
- JobSubmissionProtocol.versionID, addr, conf, NetUtils
- .getSocketFactory(conf, JobSubmissionProtocol.class));
RetryPolicy backoffPolicy =
RetryPolicies.retryUpToMaximumCountWithProportionalSleep
(5, 10, java.util.concurrent.TimeUnit.SECONDS);
@@ -388,13 +392,15 @@
*/
public JobClient(InetSocketAddress jobTrackAddr,
Configuration conf) throws IOException {
- jobSubmitClient = createProxy(jobTrackAddr, conf);
+ rpcProxy = createRPCProxy(jobTrackAddr, conf);
+ jobSubmitClient = createRetryProxy(rpcProxy);
}
/**
* Close the <code>JobClient</code>.
*/
public synchronized void close() throws IOException {
+ RPC.stopProxy(rpcProxy);
}
/**
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=635642&r1=635641&r2=635642&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon Mar 10 11:41:16 2008
@@ -767,6 +767,9 @@
// Shutdown the fetcher thread
this.mapEventsFetcher.interrupt();
+
+ // shutdown RPC connections
+ RPC.stopProxy(jobClient);
}
/**
@@ -2078,6 +2081,7 @@
throwable.printStackTrace(new PrintStream(baos));
umbilical.reportDiagnosticInfo(taskid, baos.toString());
} finally {
+ RPC.stopProxy(umbilical);
MetricsContext metricsContext = MetricsUtil.getContext("mapred");
metricsContext.close();
// Shutting down log4j of the child-vm...
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeDeath.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeDeath.java?rev=635642&r1=635641&r2=635642&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeDeath.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeDeath.java Mon Mar 10 11:41:16 2008
@@ -331,7 +331,7 @@
private void complexTest() throws IOException {
Configuration conf = new Configuration();
conf.setInt("heartbeat.recheck.interval", 2000);
- conf.setInt("dfs.heartbeat.interval", 1);
+ conf.setInt("dfs.heartbeat.interval", 2);
conf.setInt("dfs.replication.pending.timeout.sec", 2);
conf.setInt("dfs.socket.timeout", 5000);
MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, true, null);
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java?rev=635642&r1=635641&r2=635642&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java Mon Mar 10 11:41:16 2008
@@ -256,6 +256,7 @@
} finally {
fs.close();
cluster.shutdown();
+ client.close();
}
}
@@ -328,6 +329,7 @@
} catch (Exception e) {
}
cluster.shutdown();
+ client.close();
}
}
@@ -336,6 +338,8 @@
*/
public void testFileCreationNamenodeRestart() throws IOException {
Configuration conf = new Configuration();
+ final int MAX_IDLE_TIME = 2000; // 2s
+ conf.setInt("ipc.client.connection.maxidletime", MAX_IDLE_TIME);
conf.setInt("heartbeat.recheck.interval", 1000);
conf.setInt("dfs.heartbeat.interval", 1);
if (simulatedStorage) {
@@ -348,6 +352,7 @@
int nnport = cluster.getNameNodePort();
InetSocketAddress addr = new InetSocketAddress("localhost", nnport);
+ DFSClient client = null;
try {
// create a new file.
@@ -371,7 +376,7 @@
// This ensures that leases are persisted in fsimage.
cluster.shutdown();
try {
- Thread.sleep(5000);
+ Thread.sleep(2*MAX_IDLE_TIME);
} catch (InterruptedException e) {
}
cluster = new MiniDFSCluster(nnport, conf, 1, false, true,
@@ -400,7 +405,7 @@
stm2.close();
// verify that new block is associated with this file
- DFSClient client = new DFSClient(addr, conf);
+ client = new DFSClient(addr, conf);
LocatedBlocks locations = client.namenode.getBlockLocations(
file1.toString(), 0, Long.MAX_VALUE);
System.out.println("locations = " + locations.locatedBlockCount());
@@ -416,6 +421,7 @@
} finally {
fs.close();
cluster.shutdown();
+ if (client != null) client.close();
}
}
@@ -473,4 +479,5 @@
testFileCreation();
simulatedStorage = false;
}
+
}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestHDFSServerPorts.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestHDFSServerPorts.java?rev=635642&r1=635641&r2=635642&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestHDFSServerPorts.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestHDFSServerPorts.java Mon Mar 10 11:41:16 2008
@@ -66,7 +66,6 @@
if (nn != null) {
nn.stop();
}
- RPC.stopClient();
}
public Configuration getConfig() {
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java?rev=635642&r1=635641&r2=635642&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java Mon Mar 10 11:41:16 2008
@@ -25,8 +25,6 @@
import junit.framework.TestCase;
import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
import org.apache.commons.logging.*;
@@ -54,6 +52,7 @@
public static final long versionID = 1L;
void ping() throws IOException;
+ void slowPing(boolean shouldSlow) throws IOException;
String echo(String value) throws IOException;
String[] echo(String[] value) throws IOException;
Writable echo(Writable value) throws IOException;
@@ -65,13 +64,28 @@
}
public class TestImpl implements TestProtocol {
-
+ int fastPingCounter = 0;
+
public long getProtocolVersion(String protocol, long clientVersion) {
return TestProtocol.versionID;
}
public void ping() {}
+ public synchronized void slowPing(boolean shouldSlow) {
+ if (shouldSlow) {
+ while (fastPingCounter < 2) {
+ try {
+ wait(); // slow response until two fast pings happened
+ } catch (InterruptedException ignored) {}
+ }
+ fastPingCounter -= 2;
+ } else {
+ fastPingCounter++;
+ notify();
+ }
+ }
+
public String echo(String value) throws IOException { return value; }
public String[] echo(String[] values) throws IOException { return values; }
@@ -159,7 +173,7 @@
public void run() {
try {
- proxy.ping(); // this would hang until simulateError is false
+ proxy.slowPing(true); // this would hang until two fast pings happened
done = true;
} catch (IOException e) {
assertTrue("SlowRPC ping exception " + e, false);
@@ -169,57 +183,58 @@
public void testSlowRpc() throws Exception {
System.out.println("Testing Slow RPC");
- Server server = RPC.getServer(new TestImpl(), ADDRESS, 0, conf);
+ // create a server with two handlers
+ Server server = RPC.getServer(new TestImpl(), ADDRESS, 0, 2, false, conf);
+ TestProtocol proxy = null;
+
+ try {
server.start();
InetSocketAddress addr = server.getListenerAddress();
- // create a client and make an RPC that does not read its response
- //
- TestProtocol proxy1 =
- (TestProtocol)RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf);
- Collection collection = RPC.allClients();
- assertTrue("There should be only one client.", collection.size() == 1);
- Iterator iter = collection.iterator();
- Client client = (Client) iter.next();
-
- client.simulateError(true);
- RPC.removeClients();
- SlowRPC slowrpc = new SlowRPC(proxy1);
+ // create a client
+ proxy = (TestProtocol)RPC.getProxy(
+ TestProtocol.class, TestProtocol.versionID, addr, conf);
+
+ SlowRPC slowrpc = new SlowRPC(proxy);
Thread thread = new Thread(slowrpc, "SlowRPC");
- thread.start();
+ thread.start(); // send a slow RPC, which won't return until two fast pings
assertTrue("Slow RPC should not have finished1.", !slowrpc.isDone());
- // create another client and make another RPC to the same server. This
- // should complete even though the first one is still hanging.
- //
- TestProtocol proxy2 =
- (TestProtocol)RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf);
- proxy2.ping();
-
+ proxy.slowPing(false); // first fast ping
+
// verify that the first RPC is still stuck
assertTrue("Slow RPC should not have finished2.", !slowrpc.isDone());
- // Make the first RPC process its response.
- client.simulateError(false);
+ proxy.slowPing(false); // second fast ping
+
+ // Now the slow ping should be able to be executed
while (!slowrpc.isDone()) {
System.out.println("Waiting for slow RPC to get done.");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
}
- server.stop();
+ } finally {
+ server.stop();
+ if (proxy != null) {
+ RPC.stopProxy(proxy);
+ }
+ System.out.println("Down slow rpc testing");
+ }
}
public void testCalls() throws Exception {
Server server = RPC.getServer(new TestImpl(), ADDRESS, 0, conf);
+ TestProtocol proxy = null;
+ try {
server.start();
InetSocketAddress addr = server.getListenerAddress();
- TestProtocol proxy =
- (TestProtocol)RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf);
-
+ proxy = (TestProtocol)RPC.getProxy(
+ TestProtocol.class, TestProtocol.versionID, addr, conf);
+
proxy.ping();
String stringResult = proxy.echo("foo");
@@ -288,8 +303,10 @@
Object[] voids = (Object[])RPC.call(ping, new Object[][]{{},{}},
new InetSocketAddress[] {addr, addr}, conf);
assertEquals(voids, null);
-
- server.stop();
+ } finally {
+ server.stop();
+ if(proxy!=null) RPC.stopProxy(proxy);
+ }
}
public static void main(String[] args) throws Exception {