You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2008/03/10 19:41:19 UTC

svn commit: r635642 - in /hadoop/core/trunk: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/ipc/ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/ipc/

Author: dhruba
Date: Mon Mar 10 11:41:16 2008
New Revision: 635642

URL: http://svn.apache.org/viewvc?rev=635642&view=rev
Log:
HADOOP-2870.  DataNode and NameNode closes all connections while
shutting down. (Hairong Kuang via dhruba)


Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java
    hadoop/core/trunk/src/java/org/apache/hadoop/ipc/RPC.java
    hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeDeath.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestHDFSServerPorts.java
    hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=635642&r1=635641&r2=635642&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Mar 10 11:41:16 2008
@@ -157,6 +157,9 @@
     HADOOP-2943. Compression of intermediate map output causes failures
     in the merge. (cdouglas)
 
+    HADOOP-2870.  DataNode and NameNode closes all connections while
+    shutting down. (Hairong Kuang via dhruba)
+
 Release 0.16.1 - 2008-03-13
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=635642&r1=635641&r2=635642&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Mon Mar 10 11:41:16 2008
@@ -59,7 +59,8 @@
   public static final Log LOG = LogFactory.getLog(DFSClient.class);
   static final int MAX_BLOCK_ACQUIRE_FAILURES = 3;
   private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
-  ClientProtocol namenode;
+  final ClientProtocol namenode;
+  final private ClientProtocol rpcNamenode;
   final UnixUserGroupInformation ugi;
   volatile boolean clientRunning = true;
   Random r = new Random();
@@ -77,9 +78,26 @@
    */
   private TreeMap<String, OutputStream> pendingCreates =
     new TreeMap<String, OutputStream>();
-    
-  static ClientProtocol createNamenode(
-      InetSocketAddress nameNodeAddr, Configuration conf)
+ 
+  static ClientProtocol createNamenode( InetSocketAddress nameNodeAddr,
+      Configuration conf) throws IOException {
+    try {
+      return createNamenode(createRPCNamenode(nameNodeAddr, conf,
+        UnixUserGroupInformation.login(conf, true)));
+    } catch (LoginException e) {
+      throw (IOException)(new IOException().initCause(e));
+    }
+  }
+
+  private static ClientProtocol createRPCNamenode(InetSocketAddress nameNodeAddr,
+      Configuration conf, UnixUserGroupInformation ugi) 
+    throws IOException {
+    return (ClientProtocol)RPC.getProxy(ClientProtocol.class,
+        ClientProtocol.versionID, nameNodeAddr, ugi, conf,
+        NetUtils.getSocketFactory(conf, ClientProtocol.class));
+  }
+
+  private static ClientProtocol createNamenode(ClientProtocol rpcNamenode)
     throws IOException {
     RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(
         5, 200, TimeUnit.MILLISECONDS);
@@ -118,18 +136,8 @@
     methodNameToPolicyMap.put("getEditLogSize", methodPolicy);
     methodNameToPolicyMap.put("create", methodPolicy);
 
-    UserGroupInformation userInfo;
-    try {
-      userInfo = UnixUserGroupInformation.login(conf);
-    } catch (LoginException e) {
-      throw new IOException(e.getMessage());
-    }
-
     return (ClientProtocol) RetryProxy.create(ClientProtocol.class,
-        RPC.getProxy(ClientProtocol.class,
-            ClientProtocol.versionID, nameNodeAddr, userInfo, conf,
-            NetUtils.getSocketFactory(conf, ClientProtocol.class)),
-        methodNameToPolicyMap);
+        rpcNamenode, methodNameToPolicyMap);
   }
         
   /** 
@@ -141,14 +149,16 @@
     this.socketTimeout = conf.getInt("dfs.socket.timeout", 
                                      FSConstants.READ_TIMEOUT);
     this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
-
+    
     try {
       this.ugi = UnixUserGroupInformation.login(conf, true);
     } catch (LoginException e) {
       throw (IOException)(new IOException().initCause(e));
     }
 
-    this.namenode = createNamenode(nameNodeAddr, conf);
+    this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
+    this.namenode = createNamenode(rpcNamenode);
+
     String taskId = conf.get("mapred.task.id");
     if (taskId != null) {
       this.clientName = "DFSClient_" + taskId; 
@@ -169,8 +179,8 @@
   }
     
   /**
-   * Close the file system, abadoning all of the leases and files being
-   * created.
+   * Close the file system, abandoning all of the leases and files being
+   * created and close connections to the namenode.
    */
   public void close() throws IOException {
     // synchronize in here so that we don't need to change the API
@@ -197,6 +207,9 @@
         leaseChecker.join();
       } catch (InterruptedException ie) {
       }
+      
+      // close connections to the namenode
+      RPC.stopProxy(rpcNamenode);
     }
   }
 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=635642&r1=635641&r2=635642&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Mon Mar 10 11:41:16 2008
@@ -488,6 +488,9 @@
         }
       }
     }
+    
+    RPC.stopProxy(namenode); // stop the RPC threads
+    
     if(upgradeManager != null)
       upgradeManager.shutdownUpgrade();
     if (blockScannerThread != null) {

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=635642&r1=635641&r2=635642&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java Mon Mar 10 11:41:16 2008
@@ -74,8 +74,21 @@
   private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
   private Thread connectionCullerThread;
   private SocketFactory socketFactory;           // how to create sockets
-  private boolean simulateError = false;         // unit tests
-
+  
+  private int refCount = 1;
+  
+  synchronized void incCount() {
+	  refCount++;
+  }
+  
+  synchronized void decCount() {
+    refCount--;
+  }
+  
+  synchronized boolean isZeroReference() {
+    return refCount==0;
+  }
+  
   /** A call waiting for a value. */
   private class Call {
     int id;                                       // call id
@@ -289,7 +302,6 @@
           } else {
             Writable value = (Writable)ReflectionUtils.newInstance(valueClass, conf);
             try {
-              waitForEndSimulation();
               readingCall = call;
               value.readFields(in);                 // read value
             } finally {
@@ -473,15 +485,48 @@
     this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf));
   }
  
+  /** Return the socket factory of this client
+   *
+   * @return this client's socket factory
+   */
+  SocketFactory getSocketFactory() {
+    return socketFactory;
+  }
+
   /** Stop all threads related to this client.  No further calls may be made
    * using this client. */
   public void stop() {
-    LOG.info("Stopping client");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Stopping client");
+    }
+    
+    if (running == false) {
+      return;
+    }
     running = false;
+
     connectionCullerThread.interrupt();
     try {
       connectionCullerThread.join();
     } catch(InterruptedException e) {}
+
+    // close and wake up all connections
+    synchronized (connections) {
+      for (Connection conn : connections.values()) {
+        synchronized (conn) {
+          conn.setCloseConnection();
+          conn.notifyAll();
+        }
+      }
+    }
+    
+    // wait until all connections are closed
+    while (!connections.isEmpty()) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+      }
+    }
   }
 
   /** Sets the timeout used for network i/o. */
@@ -614,19 +659,4 @@
       return address.hashCode() ^ System.identityHashCode(ticket);
     }
   }  
-
-  void simulateError(boolean flag) {
-    simulateError = flag;
-  }
- 
-  // If errors are being simulated, then wait.
-  private void waitForEndSimulation() {
-    while (simulateError) {
-      try {
-        LOG.info("RPC Client waiting for simulation to end");
-        Thread.sleep(1000);
-      } catch (InterruptedException e) {
-      }
-    }
-  }
 }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/ipc/RPC.java?rev=635642&r1=635641&r2=635642&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/ipc/RPC.java Mon Mar 10 11:41:16 2008
@@ -133,63 +133,76 @@
 
   }
 
-  private static Map<SocketFactory, Client> CLIENTS =
+  /* Cache a client using its socket factory as the hash key */
+  static private class ClientCache {
+    private Map<SocketFactory, Client> clients =
       new HashMap<SocketFactory, Client>();
 
-  private static synchronized Client getClient(Configuration conf,
-      SocketFactory factory) {
-    // Construct & cache client.  The configuration is only used for timeout,
-    // and Clients have connection pools.  So we can either (a) lose some
-    // connection pooling and leak sockets, or (b) use the same timeout for all
-    // configurations.  Since the IPC is usually intended globally, not
-    // per-job, we choose (a).
-    Client client = CLIENTS.get(factory);
-    if (client == null) {
-      client = new Client(ObjectWritable.class, conf, factory);
-      CLIENTS.put(factory, client);
+    /**
+     * Construct & cache an IPC client with the user-provided SocketFactory 
+     * if no cached client exists.
+     * 
+     * @param conf Configuration
+     * @return an IPC client
+     */
+    private synchronized Client getClient(Configuration conf,
+        SocketFactory factory) {
+      // Construct & cache client.  The configuration is only used for timeout,
+      // and Clients have connection pools.  So we can either (a) lose some
+      // connection pooling and leak sockets, or (b) use the same timeout for all
+      // configurations.  Since the IPC is usually intended globally, not
+      // per-job, we choose (a).
+      Client client = clients.get(factory);
+      if (client == null) {
+        client = new Client(ObjectWritable.class, conf, factory);
+        clients.put(factory, client);
+      } else {
+        client.incCount();
+      }
+      return client;
     }
-    return client;
-  }
-  
-  /**
-   * Construct & cache client with the default SocketFactory.
-   * @param conf
-   * @return
-   */
-  private static Client getClient(Configuration conf) {
-    return getClient(conf, SocketFactory.getDefault());
-  }
 
-  /**
-   * Stop all RPC client connections
-   */
-  public static synchronized void stopClient(){
-    for (Client client : CLIENTS.values())
-      client.stop();
-    CLIENTS.clear();
-  }
-
-  /*
-   * remove specified client from the list of clients.
-   */
-  static synchronized void removeClients() {
-    CLIENTS.clear();
-  }
+    /**
+     * Construct & cache an IPC client with the default SocketFactory 
+     * if no cached client exists.
+     * 
+     * @param conf Configuration
+     * @return an IPC client
+     */
+    private synchronized Client getClient(Configuration conf) {
+      return getClient(conf, SocketFactory.getDefault());
+    }
 
-  static synchronized Collection allClients() {
-    return CLIENTS.values();
+    /**
+     * Stop a RPC client connection 
+     * A RPC client is closed only when its reference count becomes zero.
+     */
+    private void stopClient(Client client) {
+      synchronized (this) {
+        client.decCount();
+        if (client.isZeroReference()) {
+          clients.remove(client.getSocketFactory());
+        }
+      }
+      if (client.isZeroReference()) {
+        client.stop();
+      }
+    }
   }
 
+  private static ClientCache CLIENTS=new ClientCache();
+  
   private static class Invoker implements InvocationHandler {
     private InetSocketAddress address;
     private UserGroupInformation ticket;
     private Client client;
+    private boolean isClosed = false;
 
     public Invoker(InetSocketAddress address, UserGroupInformation ticket, 
                    Configuration conf, SocketFactory factory) {
       this.address = address;
       this.ticket = ticket;
-      this.client = getClient(conf, factory);
+      this.client = CLIENTS.getClient(conf, factory);
     }
 
     public Object invoke(Object proxy, Method method, Object[] args)
@@ -201,6 +214,14 @@
       LOG.debug("Call: " + method.getName() + " " + callTime);
       return value.get();
     }
+    
+    /* close the IPC client that's responsible for this invoker's RPCs */ 
+    synchronized private void close() {
+      if (!isClosed) {
+        isClosed = true;
+        CLIENTS.stopClient(client);
+      }
+    }
   }
 
   /**
@@ -236,7 +257,7 @@
     }
     
     /**
-     * Get the client's prefered version
+     * Get the client's preferred version
      */
     public long getClientVersion() {
       return clientVersion;
@@ -316,6 +337,16 @@
         .getDefaultSocketFactory(conf));
   }
 
+  /**
+   * Stop this proxy and release its invoker's resource
+   * @param proxy the proxy to be stopped
+   */
+  public static void stopProxy(VersionedProtocol proxy) {
+    if (proxy!=null) {
+      ((Invoker)Proxy.getInvocationHandler(proxy)).close();
+    }
+  }
+
   /** Expert: Make multiple, parallel calls to a set of servers. */
   public static Object[] call(Method method, Object[][] params,
                               InetSocketAddress[] addrs, Configuration conf)
@@ -324,7 +355,9 @@
     Invocation[] invocations = new Invocation[params.length];
     for (int i = 0; i < params.length; i++)
       invocations[i] = new Invocation(method, params[i]);
-    Writable[] wrappedValues = getClient(conf).call(invocations, addrs);
+    Client client = CLIENTS.getClient(conf);
+    try {
+    Writable[] wrappedValues = client.call(invocations, addrs);
     
     if (method.getReturnType() == Void.TYPE) {
       return null;
@@ -337,6 +370,9 @@
         values[i] = ((ObjectWritable)wrappedValues[i]).get();
     
     return values;
+    } finally {
+      CLIENTS.stopClient(client);
+    }
   }
 
   /** Construct a server for a protocol implementation instance listening on a

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=635642&r1=635641&r2=635642&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java Mon Mar 10 11:41:16 2008
@@ -345,7 +345,11 @@
 
         selector= null;
         acceptChannel= null;
-        connectionList = null;
+        
+        // clean up all connections
+        while (!connectionList.isEmpty()) {
+          closeConnection(connectionList.remove(0));
+        }
       }
     }
 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?rev=635642&r1=635641&r2=635642&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Mon Mar 10 11:41:16 2008
@@ -317,6 +317,8 @@
   }
 
   JobSubmissionProtocol jobSubmitClient;
+  private JobSubmissionProtocol rpcProxy;
+  
   FileSystem fs = null;
 
   static Random r = new Random();
@@ -349,10 +351,17 @@
     if ("local".equals(tracker)) {
       this.jobSubmitClient = new LocalJobRunner(conf);
     } else {
-      this.jobSubmitClient = createProxy(JobTracker.getAddress(conf), conf);
+      this.rpcProxy = createRPCProxy(JobTracker.getAddress(conf), conf);
+      this.jobSubmitClient = createRetryProxy(this.rpcProxy);
     }        
   }
 
+  private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
+      Configuration conf) throws IOException {
+    return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
+        JobSubmissionProtocol.versionID, addr, conf,
+        NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
+  }
   /**
    * Create a proxy JobSubmissionProtocol that retries timeouts.
    * 
@@ -361,13 +370,8 @@
    * @return a proxy object that will retry timeouts.
    * @throws IOException
    */
-  private JobSubmissionProtocol createProxy(InetSocketAddress addr,
-                                            Configuration conf
+  private JobSubmissionProtocol createRetryProxy(JobSubmissionProtocol raw
                                             ) throws IOException {
-    JobSubmissionProtocol raw =
-        (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
-            JobSubmissionProtocol.versionID, addr, conf, NetUtils
-                .getSocketFactory(conf, JobSubmissionProtocol.class));
     RetryPolicy backoffPolicy =
       RetryPolicies.retryUpToMaximumCountWithProportionalSleep
       (5, 10, java.util.concurrent.TimeUnit.SECONDS);
@@ -388,13 +392,15 @@
    */
   public JobClient(InetSocketAddress jobTrackAddr, 
                    Configuration conf) throws IOException {
-    jobSubmitClient = createProxy(jobTrackAddr, conf);
+    rpcProxy =  createRPCProxy(jobTrackAddr, conf);
+    jobSubmitClient = createRetryProxy(rpcProxy);
   }
 
   /**
    * Close the <code>JobClient</code>.
    */
   public synchronized void close() throws IOException {
+    RPC.stopProxy(rpcProxy);
   }
 
   /**

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=635642&r1=635641&r2=635642&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon Mar 10 11:41:16 2008
@@ -767,6 +767,9 @@
         
     // Shutdown the fetcher thread
     this.mapEventsFetcher.interrupt();
+    
+    // shutdown RPC connections
+    RPC.stopProxy(jobClient);
   }
 
   /**
@@ -2078,6 +2081,7 @@
         throwable.printStackTrace(new PrintStream(baos));
         umbilical.reportDiagnosticInfo(taskid, baos.toString());
       } finally {
+        RPC.stopProxy(umbilical);
         MetricsContext metricsContext = MetricsUtil.getContext("mapred");
         metricsContext.close();
         // Shutting down log4j of the child-vm... 

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeDeath.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeDeath.java?rev=635642&r1=635641&r2=635642&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeDeath.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeDeath.java Mon Mar 10 11:41:16 2008
@@ -331,7 +331,7 @@
   private void complexTest() throws IOException {
     Configuration conf = new Configuration();
     conf.setInt("heartbeat.recheck.interval", 2000);
-    conf.setInt("dfs.heartbeat.interval", 1);
+    conf.setInt("dfs.heartbeat.interval", 2);
     conf.setInt("dfs.replication.pending.timeout.sec", 2);
     conf.setInt("dfs.socket.timeout", 5000);
     MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, true, null);

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java?rev=635642&r1=635641&r2=635642&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java Mon Mar 10 11:41:16 2008
@@ -256,6 +256,7 @@
     } finally {
       fs.close();
       cluster.shutdown();
+      client.close();
     }
   }
 
@@ -328,6 +329,7 @@
       } catch (Exception e) {
       }
       cluster.shutdown();
+      client.close();
     }
   }
 
@@ -336,6 +338,8 @@
    */
   public void testFileCreationNamenodeRestart() throws IOException {
     Configuration conf = new Configuration();
+    final int MAX_IDLE_TIME = 2000; // 2s
+    conf.setInt("ipc.client.connection.maxidletime", MAX_IDLE_TIME);
     conf.setInt("heartbeat.recheck.interval", 1000);
     conf.setInt("dfs.heartbeat.interval", 1);
     if (simulatedStorage) {
@@ -348,6 +352,7 @@
     int nnport = cluster.getNameNodePort();
     InetSocketAddress addr = new InetSocketAddress("localhost", nnport);
 
+    DFSClient client = null;
     try {
 
       // create a new file.
@@ -371,7 +376,7 @@
       // This ensures that leases are persisted in fsimage.
       cluster.shutdown();
       try {
-        Thread.sleep(5000);
+        Thread.sleep(2*MAX_IDLE_TIME);
       } catch (InterruptedException e) {
       }
       cluster = new MiniDFSCluster(nnport, conf, 1, false, true, 
@@ -400,7 +405,7 @@
       stm2.close();
 
       // verify that new block is associated with this file
-      DFSClient client = new DFSClient(addr, conf);
+      client = new DFSClient(addr, conf);
       LocatedBlocks locations = client.namenode.getBlockLocations(
                                   file1.toString(), 0, Long.MAX_VALUE);
       System.out.println("locations = " + locations.locatedBlockCount());
@@ -416,6 +421,7 @@
     } finally {
       fs.close();
       cluster.shutdown();
+      if (client != null)  client.close();
     }
   }
 
@@ -473,4 +479,5 @@
     testFileCreation();
     simulatedStorage = false;
   }
+
 }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestHDFSServerPorts.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestHDFSServerPorts.java?rev=635642&r1=635641&r2=635642&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestHDFSServerPorts.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestHDFSServerPorts.java Mon Mar 10 11:41:16 2008
@@ -66,7 +66,6 @@
     if (nn != null) {
       nn.stop();
     }
-    RPC.stopClient();
   }
 
   public Configuration getConfig() {

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java?rev=635642&r1=635641&r2=635642&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java Mon Mar 10 11:41:16 2008
@@ -25,8 +25,6 @@
 import junit.framework.TestCase;
 
 import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
 
 import org.apache.commons.logging.*;
 
@@ -54,6 +52,7 @@
     public static final long versionID = 1L;
     
     void ping() throws IOException;
+    void slowPing(boolean shouldSlow) throws IOException;
     String echo(String value) throws IOException;
     String[] echo(String[] value) throws IOException;
     Writable echo(Writable value) throws IOException;
@@ -65,13 +64,28 @@
   }
 
   public class TestImpl implements TestProtocol {
-
+    int fastPingCounter = 0;
+    
     public long getProtocolVersion(String protocol, long clientVersion) {
       return TestProtocol.versionID;
     }
     
     public void ping() {}
 
+    public synchronized void slowPing(boolean shouldSlow) {
+      if (shouldSlow) {
+        while (fastPingCounter < 2) {
+          try {
+          wait();  // slow response until two fast pings happened
+          } catch (InterruptedException ignored) {}
+        }
+        fastPingCounter -= 2;
+      } else {
+        fastPingCounter++;
+        notify();
+      }
+    }
+    
     public String echo(String value) throws IOException { return value; }
 
     public String[] echo(String[] values) throws IOException { return values; }
@@ -159,7 +173,7 @@
 
     public void run() {
       try {
-        proxy.ping();           // this would hang until simulateError is false
+        proxy.slowPing(true);   // this would hang until two fast pings happened
         done = true;
       } catch (IOException e) {
         assertTrue("SlowRPC ping exception " + e, false);
@@ -169,57 +183,58 @@
 
   public void testSlowRpc() throws Exception {
     System.out.println("Testing Slow RPC");
-    Server server = RPC.getServer(new TestImpl(), ADDRESS, 0, conf);
+    // create a server with two handlers
+    Server server = RPC.getServer(new TestImpl(), ADDRESS, 0, 2, false, conf);
+    TestProtocol proxy = null;
+    
+    try {
     server.start();
 
     InetSocketAddress addr = server.getListenerAddress();
 
-    // create a client and make an RPC that does not read its response
-    //
-    TestProtocol proxy1 =
-      (TestProtocol)RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf);
-    Collection collection = RPC.allClients();
-    assertTrue("There should be only one client.", collection.size() == 1);
-    Iterator iter = collection.iterator();
-    Client client = (Client) iter.next();
-
-    client.simulateError(true);
-    RPC.removeClients();
-    SlowRPC slowrpc = new SlowRPC(proxy1);
+    // create a client
+    proxy = (TestProtocol)RPC.getProxy(
+        TestProtocol.class, TestProtocol.versionID, addr, conf);
+
+    SlowRPC slowrpc = new SlowRPC(proxy);
     Thread thread = new Thread(slowrpc, "SlowRPC");
-    thread.start();
+    thread.start(); // send a slow RPC, which won't return until two fast pings
     assertTrue("Slow RPC should not have finished1.", !slowrpc.isDone());
 
-    // create another client and make another RPC to the same server. This
-    // should complete even though the first one is still hanging.
-    //
-    TestProtocol proxy2 =
-      (TestProtocol)RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf);
-    proxy2.ping();
-
+    proxy.slowPing(false); // first fast ping
+    
     // verify that the first RPC is still stuck
     assertTrue("Slow RPC should not have finished2.", !slowrpc.isDone());
 
-    // Make the first RPC process its response. 
-    client.simulateError(false);
+    proxy.slowPing(false); // second fast ping
+    
+    // Now the slow ping should be able to be executed
     while (!slowrpc.isDone()) {
       System.out.println("Waiting for slow RPC to get done.");
       try {
         Thread.sleep(1000);
       } catch (InterruptedException e) {}
     }
-    server.stop();
+    } finally {
+      server.stop();
+      if (proxy != null) {
+        RPC.stopProxy(proxy);
+      }
+      System.out.println("Down slow rpc testing");
+    }
   }
 
 
   public void testCalls() throws Exception {
     Server server = RPC.getServer(new TestImpl(), ADDRESS, 0, conf);
+    TestProtocol proxy = null;
+    try {
     server.start();
 
     InetSocketAddress addr = server.getListenerAddress();
-    TestProtocol proxy =
-      (TestProtocol)RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf);
-    
+    proxy = (TestProtocol)RPC.getProxy(
+        TestProtocol.class, TestProtocol.versionID, addr, conf);
+      
     proxy.ping();
 
     String stringResult = proxy.echo("foo");
@@ -288,8 +303,10 @@
     Object[] voids = (Object[])RPC.call(ping, new Object[][]{{},{}},
                                         new InetSocketAddress[] {addr, addr}, conf);
     assertEquals(voids, null);
-
-    server.stop();
+    } finally {
+      server.stop();
+      if(proxy!=null) RPC.stopProxy(proxy);
+    }
   }
   public static void main(String[] args) throws Exception {