You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2007/12/13 02:04:48 UTC

svn commit: r603795 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/ipc/Client.java src/java/org/apache/hadoop/ipc/RPC.java src/java/org/apache/hadoop/ipc/Server.java src/test/org/apache/hadoop/ipc/TestRPC.java

Author: dhruba
Date: Wed Dec 12 17:04:47 2007
New Revision: 603795

URL: http://svn.apache.org/viewvc?rev=603795&view=rev
Log:
HADOOP-1841. Prevent slow clients from consuming threads in the NameNode.
(dhruba)


Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=603795&r1=603794&r2=603795&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Dec 12 17:04:47 2007
@@ -238,6 +238,9 @@
     HADOOP-2359.  Remove warning for interruptted exception when closing down
     minidfs. (dhruba via omalley)
 
+    HADOOP-1841. Prevent slow clients from consuming threads in the NameNode. 
+    (dhruba)
+
 Branch 0.15 (unreleased)
 
   BUG FIXES

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=603795&r1=603794&r2=603795&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java Wed Dec 12 17:04:47 2007
@@ -73,6 +73,7 @@
   private int maxRetries; //the max. no. of retries for socket connections
   private Thread connectionCullerThread;
   private SocketFactory socketFactory;           // how to create sockets
+  private boolean simulateError = false;         // unit tests
 
   /** A call waiting for a value. */
   private class Call {
@@ -286,6 +287,7 @@
           } else {
             Writable value = (Writable)ReflectionUtils.newInstance(valueClass, conf);
             try {
+              waitForEndSimulation();
               readingCall = call;
               value.readFields(in);                 // read value
             } finally {
@@ -609,4 +611,19 @@
       return address.hashCode() ^ System.identityHashCode(ticket);
     }
   }  
+
+  void simulateError(boolean flag) {
+    simulateError = flag;
+  }
+ 
+  // If errors are being simulated, then wait.
+  private void waitForEndSimulation() {
+    while (simulateError) {
+      try {
+        LOG.info("RPC Client waiting for simulation to end");
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+      }
+    }
+  }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java?rev=603795&r1=603794&r2=603795&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java Wed Dec 12 17:04:47 2007
@@ -30,6 +30,7 @@
 import java.io.*;
 import java.util.Map;
 import java.util.HashMap;
+import java.util.Collection;
 
 import javax.net.SocketFactory;
 
@@ -168,6 +169,17 @@
     CLIENTS.clear();
   }
 
+  /*
+   * remove specified client from the list of clients.
+   */
+  static synchronized void removeClients() {
+    CLIENTS.clear();
+  }
+
+  static synchronized Collection allClients() {
+    return CLIENTS.values();
+  }
+
   private static class Invoker implements InvocationHandler {
     private InetSocketAddress address;
     private UserGroupInformation ticket;
@@ -415,5 +427,4 @@
       value = value.substring(0, 55)+"...";
     LOG.info(value);
   }
-  
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=603795&r1=603794&r2=603795&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Wed Dec 12 17:04:47 2007
@@ -23,6 +23,7 @@
 import java.io.DataOutputStream;
 import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
@@ -44,12 +45,13 @@
 import java.util.Iterator;
 import java.util.Random;
 
-import org.apache.commons.logging.*;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.ipc.SocketChannelOutputStream;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.*;
@@ -145,6 +147,7 @@
   private int timeout;
   private long maxCallStartAge;
   private int maxQueueSize;
+  private int socketSendBufferSize;
 
   volatile private boolean running = true;         // true while server runs
   private LinkedList<Call> callQueue = new LinkedList<Call>(); // queued calls
@@ -154,6 +157,7 @@
   //maintain a list
   //of client connections
   private Listener listener = null;
+  private Responder responder = null;
   private int numConnections = 0;
   private Handler[] handlers = null;
 
@@ -191,17 +195,24 @@
     private Writable param;                       // the parameter passed
     private Connection connection;                // connection to client
     private long receivedTime;                    // the time received
+    private ByteBuffer response;                      // the response for this call
 
     public Call(int id, Writable param, Connection connection) {
       this.id = id;
       this.param = param;
       this.connection = connection;
       this.receivedTime = System.currentTimeMillis();
+      this.response = null;
     }
     
+    @Override
     public String toString() {
       return param.toString() + " from " + connection.toString();
     }
+
+    public void setResponse(ByteBuffer response) {
+      this.response = response;
+    }
   }
 
   /** Listens on the socket. Creates jobs for the handler threads*/
@@ -288,6 +299,7 @@
       }
     }
 
+    @Override
     public void run() {
       LOG.info(getName() + ": starting");
       SERVER.set(Server.this);
@@ -428,6 +440,234 @@
     }
   }
 
+  // Sends responses of RPC back to clients.
+  private class Responder extends Thread {
+    private Selector writeSelector;
+    private boolean pending;         // call waiting to be enqueued
+
+    Responder() throws IOException {
+      this.setName("IPC Server Responder");
+      this.setDaemon(true);
+      writeSelector = Selector.open(); // create a selector
+      pending = false;
+    }
+
+    @Override
+    public void run() {
+      LOG.info(getName() + ": starting");
+      SERVER.set(Server.this);
+      long lastPurgeTime = 0;   // last check for old calls.
+
+      while (running) {
+        SelectionKey key = null;
+        try {
+          waitPending();     // If a channel is being registered, wait.
+          writeSelector.select(maxCallStartAge);
+          Iterator iter = writeSelector.selectedKeys().iterator();
+          while (iter.hasNext()) {
+            key = (SelectionKey)iter.next();
+            iter.remove();
+            try {
+              if (key.isValid() && key.isWritable()) {
+                  doAsyncWrite(key);
+              }
+            } catch (IOException e) {
+              LOG.info(getName() + ": doAsyncWrite threw exception " + e);
+              key.cancel();
+            }
+            key = null;
+          }
+          long now = System.currentTimeMillis();
+          if (now < lastPurgeTime + maxCallStartAge) {
+            continue;
+          }
+          lastPurgeTime = now;
+          //
+          // If there were some calls that have not been sent out for a
+          // long time, discard them.
+          //
+          LOG.debug("Checking for old call responses.");
+          iter = writeSelector.keys().iterator();
+          while (iter.hasNext()) {
+            key = (SelectionKey)iter.next();
+            try {
+              doPurge(key, now);
+            } catch (IOException e) {
+              LOG.warn("Error in purging old calls " + e);
+            }
+          }
+        } catch (OutOfMemoryError e) {
+          //
+          // we can run out of memory if we have too many threads
+          // log the event and sleep for a minute and give
+          // some thread(s) a chance to finish
+          //
+          LOG.warn("Out of Memory in server select", e);
+          try { Thread.sleep(60000); } catch (Exception ie) {}
+        } catch (Exception e) {
+          LOG.warn("Exception in Responder " + e);
+        }
+      }
+      LOG.info("Stopping " + this.getName());
+    }
+
+    private void doAsyncWrite(SelectionKey key) throws IOException {
+      Call call = (Call)key.attachment();
+      if (call == null) {
+        return;
+      }
+      if (key.channel() != call.connection.channel) {
+        throw new IOException("doAsyncWrite: bad channel");
+      }
+      if (processResponse(call.connection.responseQueue)) {
+        key.cancel();          // remove item from selector.
+      }
+    }
+
+    //
+    // Remove calls that have been pending in the responseQueue 
+    // for a long time.
+    //
+    private void doPurge(SelectionKey key, long now) throws IOException {
+      Call call = (Call)key.attachment();
+      if (call == null) {
+        return;
+      }
+      if (key.channel() != call.connection.channel) {
+        LOG.info("doPurge: bad channel");
+        return;
+      }
+      LinkedList<Call> responseQueue = call.connection.responseQueue;
+      synchronized (responseQueue) {
+        Iterator iter = responseQueue.listIterator(0);
+        while (iter.hasNext()) {
+          call = (Call)iter.next();
+          if (now > call.receivedTime + maxCallStartAge) {
+            LOG.info(getName() + ", call " + call +
+                     ": response discarded for being too old (" +
+                     (now - call.receivedTime) + ")");
+            iter.remove();
+          }
+        }
+
+        // If all the calls for this channel were removed, then 
+        // remove this channel from the selector
+        if (responseQueue.size() == 0) {
+          key.cancel();
+        } 
+      }
+    }
+
+    // Processes one response. Returns true if there are no more pending
+    // data for this channel.
+    //
+    private boolean processResponse(LinkedList<Call> responseQueue) throws IOException {
+      boolean error = true;
+      boolean done = false;       // there is more data for this channel.
+      int numElements = 0;
+      Call call = null;
+      try {
+        synchronized (responseQueue) {
+          //
+          // If there are no items for this channel, then we are done
+          //
+          numElements = responseQueue.size();
+          if (numElements == 0) {
+            error = false;
+            return true;              // no more data for this channel.
+          }
+          //
+          // Extract the first call
+          //
+          int numBytes = 0;
+          call = responseQueue.removeFirst();
+          SocketChannel channel = call.connection.channel;
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(getName() + ": responding to #" + call.id + " from " +
+                      call.connection);
+          }
+          //
+          // Send as much data as we can in the non-blocking fashion
+          //
+          numBytes = channel.write(call.response);
+          if (!call.response.hasRemaining()) {
+            if (numElements == 1) {    // last call fully processes.
+              done = true;             // no more data for this channel.
+            } else {
+              done = false;            // more calls pending to be sent.
+            }
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(getName() + ": responding to #" + call.id + " from " +
+                        call.connection + " Wrote " + numBytes + " bytes.");
+            }
+          } else {
+            //
+            // If we were unable to write the entire response out, then 
+            // insert in Selector queue. 
+            //
+            call.connection.responseQueue.addFirst(call); 
+            setPending();
+            try {
+              // Wakeup the thread blocked on select, only then can the call 
+              // to channel.register() complete.
+              writeSelector.wakeup();
+              SelectionKey readKey = channel.register(writeSelector, 
+                                                      SelectionKey.OP_WRITE);
+              readKey.attach(call);
+            } finally {
+              clearPending();
+            }
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(getName() + ": responding to #" + call.id + " from " +
+                        call.connection + " Wrote partial " + numBytes + 
+                        " bytes.");
+            }
+            done = false;             // this call not fully processed.
+          }
+          error = false;              // everything went off well
+        }
+      } finally {
+        if (error && call != null) {
+          LOG.warn(getName()+", call " + call + ": output error");
+          done = true;               // error. no more data for this channel.
+          synchronized (connectionList) {
+            if (connectionList.remove(call.connection))
+              numConnections--;
+          }
+          call.connection.close();
+        }
+      }
+      return done;
+    }
+
+    //
+    // Enqueue a response from the application.
+    //
+    void doRespond(Call call) throws IOException {
+      synchronized (call.connection.responseQueue) {
+        call.connection.responseQueue.addLast(call);
+        if (call.connection.responseQueue.size() == 1) {
+          processResponse(call.connection.responseQueue);
+        }
+      }
+    }
+
+    private synchronized void setPending() {   // call waiting to be enqueued.
+      pending = true;
+    }
+
+    private synchronized void clearPending() { // call done enqueueing.
+      pending = false;
+      notify();
+    }
+
+    private synchronized void waitPending() throws InterruptedException {
+      while (pending) {
+        wait();
+      }
+    }
+  }
+
   /** Reads calls from a connection and queues them for handling. */
   private class Connection {
     private boolean versionRead = false; //if initial signature and
@@ -438,8 +678,7 @@
     private SelectionKey key;
     private ByteBuffer data;
     private ByteBuffer dataLengthBuffer;
-    private DataOutputStream out;
-    private SocketChannelOutputStream channelOut;
+    private LinkedList<Call> responseQueue;
     private long lastContact;
     private int dataLength;
     private Socket socket;
@@ -457,9 +696,6 @@
       this.data = null;
       this.dataLengthBuffer = ByteBuffer.allocate(4);
       this.socket = channel.socket();
-      this.out = new DataOutputStream
-        (new BufferedOutputStream(
-          this.channelOut = new SocketChannelOutputStream(channel)));
       InetAddress addr = socket.getInetAddress();
       if (addr == null) {
         this.hostAddress = "*Unknown*";
@@ -467,8 +703,18 @@
         this.hostAddress = addr.getHostAddress();
       }
       this.remotePort = socket.getPort();
+      this.responseQueue = new LinkedList<Call>();
+      if (socketSendBufferSize != 0) {
+        try {
+          socket.setSendBufferSize(socketSendBufferSize);
+        } catch (IOException e) {
+          LOG.warn("Connection: unable to set socket send buffer size to " +
+                   socketSendBufferSize);
+        }
+      }
     }   
 
+    @Override
     public String toString() {
       return getHostAddress() + ":" + remotePort; 
     }
@@ -516,7 +762,9 @@
           if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
             //Warning is ok since this is not supposed to happen.
             LOG.warn("Incorrect header or version mismatch from " + 
-                     hostAddress + ":" + remotePort);
+                     hostAddress + ":" + remotePort +
+                     " got version " + version + 
+                     " expected version " + CURRENT_VERSION);
             return -1;
           }
           dataLengthBuffer.clear();
@@ -589,8 +837,6 @@
       if (!channel.isOpen())
         return;
       try {socket.shutdownOutput();} catch(Exception e) {}
-      try {out.close();} catch(Exception e) {}
-      try {channelOut.destroy();} catch(Exception e) {}
       if (channel.isOpen()) {
         try {channel.close();} catch(Exception e) {}
       }
@@ -607,9 +853,11 @@
       this.setName("IPC Server handler "+ instanceNumber + " on " + port);
     }
 
+    @Override
     public void run() {
       LOG.info(getName() + ": starting");
       SERVER.set(Server.this);
+      ByteArrayOutputStream buf = new ByteArrayOutputStream(10240);
       while (running) {
         try {
           Call call;
@@ -648,28 +896,20 @@
             error = StringUtils.stringifyException(e);
           }
           CurCall.set(null);
-            
-          DataOutputStream out = call.connection.out;
-          synchronized (out) {
-            try {
-              out.writeInt(call.id);                // write call id
-              out.writeBoolean(error!=null);        // write error flag
-              if (error == null) {
-                value.write(out);
-              } else {
-                WritableUtils.writeString(out, errorClass);
-                WritableUtils.writeString(out, error);
-              }
-              out.flush();
-            } catch (Exception e) {
-              LOG.warn(getName()+", call "+call+": output error", e);
-              synchronized (connectionList) {
-                if (connectionList.remove(call.connection))
-                  numConnections--;
-              }
-              call.connection.close();
-            }
+
+          buf.reset();
+          DataOutputStream out = new DataOutputStream(buf);
+          out.writeInt(call.id);                // write call id
+          out.writeBoolean(error != null);      // write error flag
+
+          if (error == null) {
+            value.write(out);
+          } else {
+            WritableUtils.writeString(out, errorClass);
+            WritableUtils.writeString(out, error);
           }
+          call.setResponse(ByteBuffer.wrap(buf.toByteArray()));
+          responder.doRespond(call);
         } catch (InterruptedException e) {
           if (running) {                          // unexpected -- log it
             LOG.info(getName() + " caught: " + e, e);
@@ -695,6 +935,7 @@
     this.paramClass = paramClass;
     this.handlerCount = handlerCount;
     this.timeout = conf.getInt("ipc.client.timeout", 10000);
+    this.socketSendBufferSize = 0;
     maxCallStartAge = (long) (timeout * MAX_CALL_QUEUE_TIME);
     maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
     this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000);
@@ -704,13 +945,20 @@
     // Start the listener here and let it bind to the port
     listener = new Listener();
     this.port = listener.getAddress().getPort();    
+
+    // Create the responder here
+    responder = new Responder();
   }
 
   /** Sets the timeout used for network i/o. */
   public void setTimeout(int timeout) { this.timeout = timeout; }
 
+  /** Sets the socket buffer size used for responding to RPCs */
+  public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
+
   /** Starts the service.  Must be called before any calls will be handled. */
   public synchronized void start() throws IOException {
+    responder.start();
     listener.start();
     handlers = new Handler[handlerCount];
     
@@ -733,6 +981,7 @@
     }
     listener.interrupt();
     listener.doStop();
+    responder.interrupt();
     notifyAll();
   }
 

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java?rev=603795&r1=603794&r2=603795&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java Wed Dec 12 17:04:47 2007
@@ -25,6 +25,8 @@
 import junit.framework.TestCase;
 
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
 
 import org.apache.commons.logging.*;
 
@@ -43,6 +45,9 @@
   
   private static Configuration conf = new Configuration();
 
+  int datasize = 1024*100;
+  int numThreads = 50;
+
   public TestRPC(String name) { super(name); }
 	
   public interface TestProtocol extends VersionedProtocol {
@@ -56,6 +61,7 @@
     int add(int[] values) throws IOException;
     int error() throws IOException;
     void testServerGet() throws IOException;
+    int[] exchange(int[] values) throws IOException;
   }
 
   public class TestImpl implements TestProtocol {
@@ -95,8 +101,117 @@
       }
     }
 
+    public int[] exchange(int[] values) {
+      int sum = 0;
+      for (int i = 0; i < values.length; i++) {
+        values[i] = i;
+      }
+      return values;
+    }
+  }
+
+  //
+  // an object that does a bunch of transactions
+  //
+  static class Transactions implements Runnable {
+    int datasize;
+    TestProtocol proxy;
+
+    Transactions(TestProtocol proxy, int datasize) {
+      this.proxy = proxy;
+      this.datasize = datasize;
+    }
+
+    // do two RPC that transfers data.
+    public void run() {
+      int[] indata = new int[datasize];
+      int[] outdata = null;
+      int val = 0;
+      try {
+        outdata = proxy.exchange(indata);
+        val = proxy.add(1,2);
+      } catch (IOException e) {
+        assertTrue("Exception from RPC exchange() "  + e, false);
+      }
+      assertEquals(indata.length, outdata.length);
+      assertEquals(val, 3);
+      for (int i = 0; i < outdata.length; i++) {
+        assertEquals(outdata[i], i);
+      }
+    }
+  }
+
+  //
+  // A class that does an RPC but does not read its response.
+  //
+  static class SlowRPC implements Runnable {
+    private TestProtocol proxy;
+    private volatile boolean done;
+   
+    SlowRPC(TestProtocol proxy) {
+      this.proxy = proxy;
+      done = false;
+    }
+
+    boolean isDone() {
+      return done;
+    }
+
+    public void run() {
+      try {
+        proxy.ping();           // this would hang until simulateError is false
+        done = true;
+      } catch (IOException e) {
+        assertTrue("SlowRPC ping exception " + e, false);
+      }
+    }
+  }
+
+  public void testSlowRpc() throws Exception {
+    System.out.println("Testing Slow RPC");
+    Server server = RPC.getServer(new TestImpl(), ADDRESS, 0, conf);
+    server.start();
+
+    InetSocketAddress addr = server.getListenerAddress();
+
+    // create a client and make an RPC that does not read its response
+    //
+    TestProtocol proxy1 =
+      (TestProtocol)RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf);
+    Collection collection = RPC.allClients();
+    assertTrue("There should be only one client.", collection.size() == 1);
+    Iterator iter = collection.iterator();
+    Client client = (Client) iter.next();
+
+    client.simulateError(true);
+    RPC.removeClients();
+    SlowRPC slowrpc = new SlowRPC(proxy1);
+    Thread thread = new Thread(slowrpc, "SlowRPC");
+    thread.start();
+    assertTrue("Slow RPC should not have finished1.", !slowrpc.isDone());
+
+    // create another client and make another RPC to the same server. This
+    // should complete even though the first one is still hanging.
+    //
+    TestProtocol proxy2 =
+      (TestProtocol)RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf);
+    proxy2.ping();
+
+    // verify that the first RPC is still stuck
+    assertTrue("Slow RPC should not have finished2.", !slowrpc.isDone());
+
+    // Make the first RPC process its response. 
+    client.simulateError(false);
+    while (!slowrpc.isDone()) {
+      System.out.println("Waiting for slow RPC to get done.");
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {}
+    }
+    server.stop();
   }
 
+
   public void testCalls() throws Exception {
     Server server = RPC.getServer(new TestImpl(), ADDRESS, 0, conf);
     server.start();
@@ -142,6 +257,26 @@
 
     proxy.testServerGet();
 
+    // create multiple threads and make them do large data transfers
+    System.out.println("Starting multi-threaded RPC test...");
+    server.setSocketSendBufSize(1024);
+    Thread threadId[] = new Thread[numThreads];
+    for (int i = 0; i < numThreads; i++) {
+      Transactions trans = new Transactions(proxy, datasize);
+      threadId[i] = new Thread(trans, "TransactionThread-" + i);
+      threadId[i].start();
+    }
+
+    // wait for all transactions to get over
+    System.out.println("Waiting for all threads to finish RPCs...");
+    for (int i = 0; i < numThreads; i++) {
+      try {
+        threadId[i].join();
+      } catch (InterruptedException e) {
+        i--;      // retry
+      }
+    }
+
     // try some multi-calls
     Method echo =
       TestProtocol.class.getMethod("echo", new Class[] { String.class });
@@ -161,5 +296,4 @@
     new TestRPC("test").testCalls();
 
   }
-
 }