You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/06/13 21:18:44 UTC

svn commit: r413958 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/ipc/Client.java src/java/org/apache/hadoop/ipc/Server.java

Author: cutting
Date: Tue Jun 13 12:18:43 2006
New Revision: 413958

URL: http://svn.apache.org/viewvc?rev=413958&view=rev
Log:
HADOOP-210.  Change RPC server to use a selector instead of a thread per connection.  This should make it easier to scale to larger clusters.  Contributed by Devaraj Das.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=413958&r1=413957&r2=413958&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Jun 13 12:18:43 2006
@@ -1,6 +1,13 @@
 Hadoop Change Log
 
 
+Trunk (unreleased changes)
+
+ 1. HADOOP-210.  Change RPC client and server so that server uses a
+    selector, and a thread per connection is no longer required.  This
+    should permit larger clusters.  (Devaraj Das via cutting)
+
+
 Release 0.3.2 - 2006-06-09
 
  1. HADOOP-275.  Update the streaming contrib module to use log4j for

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=413958&r1=413957&r2=413958&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java Tue Jun 13 12:18:43 2006
@@ -38,6 +38,7 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.DataOutputBuffer;
 
 /** A client for an IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
@@ -196,8 +197,15 @@
             LOG.debug(getName() + " sending #" + call.id);
           try {
             writingCall = call;
-            out.writeInt(call.id);
-            call.param.write(out);
+            DataOutputBuffer d = new DataOutputBuffer(); //for serializing the
+                                                         //data to be written
+            d.writeInt(call.id);
+            call.param.write(d);
+            byte[] data = d.getData();
+            int dataLength = d.getLength();
+
+            out.writeInt(dataLength);      //first put the data length
+            out.write(data, 0, dataLength);//write the data
             out.flush();
           } finally {
             writingCall = null;
@@ -208,7 +216,7 @@
         if (error)
           close();                                // close on error
       }
-    }
+    }  
 
     /** Close the connection and remove it from the pool. */
     public void close() {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=413958&r1=413957&r2=413958&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Tue Jun 13 12:18:43 2006
@@ -20,17 +20,23 @@
 import java.io.EOFException;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
 import java.io.StringWriter;
 import java.io.PrintWriter;
+import java.io.ByteArrayInputStream;
 
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.BufferUnderflowException;
+
+import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.net.ServerSocket;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
 
 import java.util.LinkedList;
+import java.util.Iterator;
+import java.util.Random;
 
 import org.apache.commons.logging.*;
 
@@ -38,7 +44,8 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.UTF8;
+
+import org.mortbay.http.nio.SocketChannelOutputStream;
 
 /** An abstract IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
@@ -65,6 +72,16 @@
   private int handlerCount;                       // number of handler threads
   private int maxQueuedCalls;                     // max number of queued calls
   private Class paramClass;                       // class of call parameters
+  private int maxIdleTime;                        // the maximum idle time after 
+                                                  // which a client may be disconnected
+  private int thresholdIdleConnections;           // the number of idle connections
+                                                  // after which we will start
+                                                  // cleaning up idle 
+                                                  // connections
+  int maxConnectionsToNuke;                       // the max number of 
+                                                  // connections to nuke
+                                                  //during a cleanup
+  
   private Configuration conf;
 
   private int timeout;
@@ -73,6 +90,12 @@
   private LinkedList callQueue = new LinkedList(); // queued calls
   private Object callDequeued = new Object();     // used by wait/notify
 
+  private InetSocketAddress address; //the address we bind at
+  private ServerSocketChannel acceptChannel = null; //the (main) accept channel
+  private Selector selector = null; //the selector that we use for the server
+  private Listener listener;
+  private int numConnections = 0;
+  
   /** A call queued for handling. */
   private static class Call {
     private int id;                               // the client's call id
@@ -86,113 +109,300 @@
     }
   }
 
-  /** Listens on the socket, starting new connection threads. */
+  /** Listens on the socket. Creates jobs for the handler threads*/
   private class Listener extends Thread {
-    private ServerSocket socket;
-
-    public Listener() throws IOException {
-      this.socket = new ServerSocket(port);
-      socket.setSoTimeout(timeout);
+    
+    private LinkedList connectionList = new LinkedList(); //maintain a list
+                                                       //of client connectionss
+    private Random rand = new Random();
+    private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
+                                         //-tion (for idle connections) ran
+    private int cleanupInterval = 10000; //the minimum interval between 
+                                         //two cleanup runs
+    
+    public Listener() {
+      address = new InetSocketAddress(port);
       this.setDaemon(true);
-      this.setName("Server listener on port " + port);
+    }
+    /** cleanup connections from connectionList. Choose a random range
+     * to scan and also have a limit on the number of the connections
+     * that will be cleanedup per run. The criteria for cleanup is the time
+     * for which the connection was idle. If 'force' is true then all 
+     * connections will be looked at for the cleanup.
+     */
+    private void cleanupConnections(boolean force) {
+      if (force || numConnections > thresholdIdleConnections) {
+        long currentTime = System.currentTimeMillis();
+        if (!force && (int)(currentTime - lastCleanupRunTime) < cleanupInterval) {
+          return;
+        }
+        int start = 0;
+        int end = numConnections - 1;
+        if (!force) {
+          start = rand.nextInt() % numConnections;
+          end = rand.nextInt() % numConnections;
+          int temp;
+          if (end < start) {
+            temp = start;
+            start = end;
+            end = temp;
+          }
+        }
+        int i = start;
+        int numNuked = 0;
+        while (i <= end) {
+          Connection c = (Connection)connectionList.get(i);
+          if (c.timedOut(currentTime)) {
+            connectionList.remove(i);
+            try {
+              LOG.info(getName() + ": disconnecting client " + c.getHostAddress());
+              c.close();
+            } catch (Exception e) {}
+            numNuked++;
+            end--;
+            if (!force && numNuked == maxConnectionsToNuke) break;
+          }
+          else i++;
+        }
+        lastCleanupRunTime = System.currentTimeMillis();
+      }
     }
 
     public void run() {
-      LOG.info(getName() + ": starting");
-      while (running) {
-        Socket acceptedSock = null;
-        try {
-          acceptedSock = socket.accept();
-          new Connection(acceptedSock).start(); // start a new connection
-        } catch (SocketTimeoutException e) {      // ignore timeouts
-        } catch (OutOfMemoryError e) {
-          // we can run out of memory if we have too many threads
-          // log the event and sleep for a minute and give 
-          // some thread(s) a chance to finish
-          LOG.warn(getName() + " out of memory, sleeping...", e);          
+      SERVER.set(Server.this);
+      
+      try {
+        // Create a new server socket and set to non blocking mode
+        acceptChannel = ServerSocketChannel.open();
+        acceptChannel.configureBlocking(false);
+
+        // Bind the server socket to the local host and port
+        acceptChannel.socket().bind(address);
+
+        // create a selector;
+        selector= Selector.open();
+
+        // Register accepts on the server socket with the selector.
+        acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
+        this.setName("Server listener on port " + port);
+          
+        LOG.info(getName() + ": starting");
+        
+        while (running) {
+          SelectionKey key = null;
           try {
-            acceptedSock.close();
+            selector.select(timeout);
+            Iterator iter = selector.selectedKeys().iterator();
+            
+            while (iter.hasNext()) {
+              key = (SelectionKey)iter.next();
+              if (key.isAcceptable())
+                doAccept(key);
+              else if (key.isReadable())
+                doRead(key);
+              iter.remove();
+              key = null;
+            }
+          } catch (OutOfMemoryError e) {
+            closeCurrentConnection(key, e);
+            cleanupConnections(true);
             Thread.sleep(60000);
-          } catch (InterruptedException ie) { // ignore interrupts
-          } catch (IOException ioe) { // ignore IOexceptions
-          }          
+          } catch (Exception e) {
+            closeCurrentConnection(key, e);
+          }
+          cleanupConnections(false);
         }
-        catch (Exception e) {           // log all other exceptions
-          LOG.info(getName() + " caught: " + e, e);
-        }        
+      } catch (Exception e) {
+        LOG.fatal("selector",e);  
       }
+      LOG.info("Stopping " + this.getName());
+
       try {
-        socket.close();
-      } catch (IOException e) {
-        LOG.info(getName() + ": e=" + e);
+        if (acceptChannel != null)
+          acceptChannel.close();
+        if (selector != null)
+          selector.close();
+      } catch (IOException e) { }
+
+      selector= null;
+      acceptChannel= null;
+      connectionList = null;
+    }
+
+    private void closeCurrentConnection(SelectionKey key, Throwable e) {
+      if (running) {
+        LOG.warn("selector: " + e);
+        e.printStackTrace();
       }
-      LOG.info(getName() + ": exiting");
+      if (key != null) {
+        Connection c = (Connection)key.attachment();
+        if (c != null) {
+          connectionList.remove(c);
+          try {
+            LOG.info(getName() + ": disconnecting client " + c.getHostAddress());
+            c.close();
+          } catch (Exception ex) {}
+        }
+      }
+    }
+
+    void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
+      Connection c = null;
+      ServerSocketChannel server = (ServerSocketChannel) key.channel();
+      SocketChannel channel = server.accept();
+      channel.configureBlocking(false);
+      SelectionKey readKey = channel.register(selector, SelectionKey.OP_READ);
+      c = new Connection(readKey, channel, System.currentTimeMillis());
+      readKey.attach(c);
+      connectionList.addLast(c);
+      numConnections++;
+      LOG.info("Server connection on port " + port + " from " + 
+                c.getHostAddress() + ": starting");
+    }
+
+    void doRead(SelectionKey key) {
+      int count = 0;
+      if (!key.isValid() || !key.isReadable())
+        return;
+      Connection c = (Connection)key.attachment();
+      if (c == null) {
+        return;  
+      }
+      c.setLastContact(System.currentTimeMillis());
+      
+      try {
+        count = c.readAndProcess();
+      } catch (Exception e) {
+        LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count);
+        count = -1; //so that the (count < 0) block is executed
+      }
+      if (count < 0) {
+        connectionList.remove(c);
+        try {
+          LOG.info(getName() + ": disconnecting client " + c.getHostAddress());
+          c.close();
+        } catch (Exception e) {}
+      }
+      else {
+        c.setLastContact(System.currentTimeMillis());
+      }
+    }   
+
+    void doStop()
+    {
+        selector.wakeup();
+        Thread.yield();
     }
   }
 
   /** Reads calls from a connection and queues them for handling. */
-  private class Connection extends Thread {
-    private Socket socket;
-    private DataInputStream in;
+  private class Connection {
+    private SocketChannel channel;
+    private SelectionKey key;
+    private ByteBuffer data;
+    private ByteBuffer dataLengthBuffer;
     private DataOutputStream out;
+    private long lastContact;
+    private int dataLength;
+    private Socket socket;
 
-    public Connection(Socket socket) throws IOException {
-      this.socket = socket;
-      socket.setSoTimeout(timeout);
-      this.in = new DataInputStream
-        (new BufferedInputStream(socket.getInputStream()));
+    public Connection(SelectionKey key, SocketChannel channel, 
+    long lastContact) {
+      this.key = key;
+      this.channel = channel;
+      this.lastContact = lastContact;
+      this.data = null;
+      this.dataLengthBuffer = null;
+      this.socket = channel.socket();
       this.out = new DataOutputStream
-        (new BufferedOutputStream(socket.getOutputStream()));
-      this.setDaemon(true);
-      this.setName("Server connection on port " + port + " from "
-                   + socket.getInetAddress().getHostAddress());
+        (new SocketChannelOutputStream(channel, 4096));
+    }   
+
+    public String getHostAddress() {
+      return socket.getInetAddress().getHostAddress();
     }
 
-    public void run() {
-      LOG.info(getName() + ": starting");
-      SERVER.set(Server.this);
-      try {
-        while (running) {
-          int id;
-          try {
-            id = in.readInt();                    // try to read an id
-          } catch (SocketTimeoutException e) {
-            continue;
-          }
-        
-          if (LOG.isDebugEnabled())
-            LOG.debug(getName() + " got #" + id);
-        
-          Writable param = makeParam();           // read param
-          param.readFields(in);        
+    public void setLastContact(long lastContact) {
+      this.lastContact = lastContact;
+    }
+
+    public long getLastContact() {
+      return lastContact;
+    }
+
+    private boolean timedOut() {
+      if(System.currentTimeMillis() -  lastContact > maxIdleTime)
+        return true;
+      return false;
+    }
+
+    private boolean timedOut(long currentTime) {
+        if(currentTime -  lastContact > timeout)
+          return true;
+        return false;
+    }
+
+    public int readAndProcess() throws IOException, InterruptedException {
+      int count = -1;
+      if (dataLengthBuffer == null)
+        dataLengthBuffer = ByteBuffer.allocateDirect(4);
+      if (dataLengthBuffer.remaining() > 0) {
+        count = channel.read(dataLengthBuffer);
+        if (count < 0) return count;
+        if (dataLengthBuffer.remaining() == 0) {
+          dataLengthBuffer.flip(); 
+          dataLength = dataLengthBuffer.getInt();
+          data = ByteBuffer.allocateDirect(dataLength);
+        }
+        return count;
+      }
+      count = channel.read(data);
+      if (data.remaining() == 0) {
+        data.flip();
+        processData();
+        data = dataLengthBuffer = null; 
+      }
+      return count;
+    }
+
+    private void processData() throws  IOException, InterruptedException {
+      byte[] bytes = new byte[dataLength];
+      data.get(bytes);
+      DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
+      int id = dis.readInt();                    // try to read an id
         
-          Call call = new Call(id, param, this);
+      if (LOG.isDebugEnabled())
+        LOG.debug(" got #" + id);
+            
+      Writable param = makeParam();           // read param
+      param.readFields(dis);        
         
-          synchronized (callQueue) {
-            callQueue.addLast(call);              // queue the call
-            callQueue.notify();                   // wake up a waiting handler
-          }
+      Call call = new Call(id, param, this);
+      synchronized (callQueue) {
+        callQueue.addLast(call);              // queue the call
+        callQueue.notify();                   // wake up a waiting handler
+      }
         
-          while (running && callQueue.size() >= maxQueuedCalls) {
-            synchronized (callDequeued) {         // queue is full
-              callDequeued.wait(timeout);         // wait for a dequeue
-            }
-          }
+      while (running && callQueue.size() >= maxQueuedCalls) {
+        synchronized (callDequeued) {         // queue is full
+          callDequeued.wait(timeout);         // wait for a dequeue
         }
-      } catch (EOFException eof) {
-          // This is what happens on linux when the other side shuts down
-      } catch (SocketException eof) {
-          // This is what happens on Win32 when the other side shuts down
-      } catch (Exception e) {
-        LOG.info(getName() + " caught: " + e, e);
-      } finally {
-        try {
-          socket.close();
-        } catch (IOException e) {}
-        LOG.info(getName() + ": exiting");
       }
     }
 
+    private void close() throws IOException {
+      data = null;
+      dataLengthBuffer = null;
+      if (!channel.isOpen())
+        return;
+      socket.shutdownOutput();
+      channel.close();
+      socket.close();
+      channel.close();
+      out.close();
+      key.cancel();
+      numConnections--;
+    }
   }
 
   /** Handles queued calls . */
@@ -245,7 +455,6 @@
               WritableUtils.writeString(out, errorClass);
               WritableUtils.writeString(out, error);
             }
-            out.flush();
           }
 
         } catch (Exception e) {
@@ -275,7 +484,10 @@
     this.paramClass = paramClass;
     this.handlerCount = handlerCount;
     this.maxQueuedCalls = handlerCount;
-    this.timeout = conf.getInt("ipc.client.timeout",10000); 
+    this.timeout = conf.getInt("ipc.client.timeout",10000);
+    this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000);
+    this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
+    this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 120000);
   }
 
   /** Sets the timeout used for network i/o. */
@@ -283,7 +495,7 @@
 
   /** Starts the service.  Must be called before any calls will be handled. */
   public synchronized void start() throws IOException {
-    Listener listener = new Listener();
+    listener = new Listener();
     listener.start();
     
     for (int i = 0; i < handlerCount; i++) {
@@ -298,6 +510,7 @@
   public synchronized void stop() {
     LOG.info("Stopping server on " + port);
     running = false;
+    listener.doStop();
     try {
       Thread.sleep(timeout);     //  inexactly wait for pending requests to finish
     } catch (InterruptedException e) {}