You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/06/21 20:15:11 UTC

svn commit: r416055 - in /lucene/hadoop/trunk: CHANGES.txt conf/hadoop-default.xml src/java/org/apache/hadoop/ipc/Client.java src/java/org/apache/hadoop/ipc/Server.java

Author: cutting
Date: Wed Jun 21 11:15:11 2006
New Revision: 416055

URL: http://svn.apache.org/viewvc?rev=416055&view=rev
Log:
HADOOP-210.  Change RPC server to use a selector instead of a thread per connection.  Contributed by Devaraj Das.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/conf/hadoop-default.xml
    lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=416055&r1=416054&r2=416055&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Jun 21 11:15:11 2006
@@ -20,6 +20,12 @@
     web ui.  Also attempt to log a thread dump of child processes
     before they're killed.  (omalley via cutting)
 
+ 6. HADOOP-210.  Change RPC server to use a selector instead of a
+    thread per connection.  This should make it easier to scale to
+    larger clusters.  Note that this incompatibly changes the RPC
+    protocol: clients and servers must both be upgraded to the new
+    version to ensure correct operation.  (Devaraj Das via cutting)
+
 
 Release 0.3.2 - 2006-06-09
 

Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?rev=416055&r1=416054&r2=416055&view=diff
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Wed Jun 21 11:15:11 2006
@@ -332,4 +332,28 @@
   <description>Defines the timeout for IPC calls in milliseconds.</description>
 </property>
 
+<property>
+  <name>ipc.client.idlethreshold</name>
+  <value>4000</value>
+  <description>Defines the threshold numner of connections after which
+               connections will be inspected for idleness.
+  </description>
+</property>
+
+<property>
+  <name>ipc.client.maxidletime</name>
+  <value>120000</value>
+  <description>Defines the maximum idle time for a connected client after 
+               which it may be disconnected.
+  </description>
+</property>
+
+<property>
+  <name>ipc.client.kill.max</name>
+  <value>10</value>
+  <description>Defines the maximum number of clients to disconnect in one go.
+  </description>
+</property>
+
+
 </configuration>

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=416055&r1=416054&r2=416055&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java Wed Jun 21 11:15:11 2006
@@ -38,6 +38,7 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.DataOutputBuffer;
 
 /** A client for an IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
@@ -196,8 +197,15 @@
             LOG.debug(getName() + " sending #" + call.id);
           try {
             writingCall = call;
-            out.writeInt(call.id);
-            call.param.write(out);
+            DataOutputBuffer d = new DataOutputBuffer(); //for serializing the
+                                                         //data to be written
+            d.writeInt(call.id);
+            call.param.write(d);
+            byte[] data = d.getData();
+            int dataLength = d.getLength();
+
+            out.writeInt(dataLength);      //first put the data length
+            out.write(data, 0, dataLength);//write the data
             out.flush();
           } finally {
             writingCall = null;
@@ -208,7 +216,7 @@
         if (error)
           close();                                // close on error
       }
-    }
+    }  
 
     /** Close the connection and remove it from the pool. */
     public void close() {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=416055&r1=416054&r2=416055&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Wed Jun 21 11:15:11 2006
@@ -20,17 +20,26 @@
 import java.io.EOFException;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.StringWriter;
 import java.io.PrintWriter;
+import java.io.ByteArrayInputStream;
 
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.BufferUnderflowException;
+
+import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.net.ServerSocket;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
 
+import java.util.Collections;
 import java.util.LinkedList;
+import java.util.List;
+import java.util.Iterator;
+import java.util.Random;
 
 import org.apache.commons.logging.*;
 
@@ -38,7 +47,8 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.UTF8;
+
+import org.mortbay.http.nio.SocketChannelOutputStream;
 
 /** An abstract IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
@@ -65,6 +75,16 @@
   private int handlerCount;                       // number of handler threads
   private int maxQueuedCalls;                     // max number of queued calls
   private Class paramClass;                       // class of call parameters
+  private int maxIdleTime;                        // the maximum idle time after 
+                                                  // which a client may be disconnected
+  private int thresholdIdleConnections;           // the number of idle connections
+                                                  // after which we will start
+                                                  // cleaning up idle 
+                                                  // connections
+  int maxConnectionsToNuke;                       // the max number of 
+                                                  // connections to nuke
+                                                  //during a cleanup
+  
   private Configuration conf;
 
   private int timeout;
@@ -73,6 +93,12 @@
   private LinkedList callQueue = new LinkedList(); // queued calls
   private Object callDequeued = new Object();     // used by wait/notify
 
+  private List connectionList = 
+       Collections.synchronizedList(new LinkedList()); //maintain a list
+                                                       //of client connectionss
+  private Listener listener;
+  private int numConnections = 0;
+  
   /** A call queued for handling. */
   private static class Call {
     private int id;                               // the client's call id
@@ -86,113 +112,323 @@
     }
   }
 
-  /** Listens on the socket, starting new connection threads. */
+  /** Listens on the socket. Creates jobs for the handler threads*/
   private class Listener extends Thread {
-    private ServerSocket socket;
-
+    
+    private ServerSocketChannel acceptChannel = null; //the accept channel
+    private Selector selector = null; //the selector that we use for the server
+    private InetSocketAddress address; //the address we bind at
+    private Random rand = new Random();
+    private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
+                                         //-tion (for idle connections) ran
+    private long cleanupInterval = 10000; //the minimum interval between 
+                                          //two cleanup runs
+    
     public Listener() throws IOException {
-      this.socket = new ServerSocket(port);
-      socket.setSoTimeout(timeout);
-      this.setDaemon(true);
+      address = new InetSocketAddress(port);
+      // Create a new server socket and set to non blocking mode
+      acceptChannel = ServerSocketChannel.open();
+      acceptChannel.configureBlocking(false);
+
+      // Bind the server socket to the local host and port
+      acceptChannel.socket().bind(address);
+      // create a selector;
+      selector= Selector.open();
+
+      // Register accepts on the server socket with the selector.
+      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
       this.setName("Server listener on port " + port);
+      this.setDaemon(true);
+    }
+    /** cleanup connections from connectionList. Choose a random range
+     * to scan and also have a limit on the number of the connections
+     * that will be cleanedup per run. The criteria for cleanup is the time
+     * for which the connection was idle. If 'force' is true then all 
+     * connections will be looked at for the cleanup.
+     */
+    private void cleanupConnections(boolean force) {
+      if (force || numConnections > thresholdIdleConnections) {
+        long currentTime = System.currentTimeMillis();
+        if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
+          return;
+        }
+        int start = 0;
+        int end = numConnections - 1;
+        if (!force) {
+          start = rand.nextInt() % numConnections;
+          end = rand.nextInt() % numConnections;
+          int temp;
+          if (end < start) {
+            temp = start;
+            start = end;
+            end = temp;
+          }
+        }
+        int i = start;
+        int numNuked = 0;
+        while (i <= end) {
+          Connection c;
+          synchronized (connectionList) {
+            try {
+              c = (Connection)connectionList.get(i);
+            } catch (Exception e) {return;}
+          }
+          if (c.timedOut(currentTime)) {
+            synchronized (connectionList) {
+              if (connectionList.remove(c))
+                numConnections--;
+            }
+            try {
+              LOG.info(getName() + ": disconnecting client " + c.getHostAddress());
+              c.close();
+            } catch (Exception e) {}
+            numNuked++;
+            end--;
+            c = null;
+            if (!force && numNuked == maxConnectionsToNuke) break;
+          }
+          else i++;
+        }
+        lastCleanupRunTime = System.currentTimeMillis();
+      }
     }
 
     public void run() {
       LOG.info(getName() + ": starting");
+      SERVER.set(Server.this);
       while (running) {
-        Socket acceptedSock = null;
+        SelectionKey key = null;
         try {
-          acceptedSock = socket.accept();
-          new Connection(acceptedSock).start(); // start a new connection
-        } catch (SocketTimeoutException e) {      // ignore timeouts
+          selector.select();
+          Iterator iter = selector.selectedKeys().iterator();
+          
+          while (iter.hasNext()) {
+            key = (SelectionKey)iter.next();
+            if (key.isAcceptable())
+              doAccept(key);
+            else if (key.isReadable())
+              doRead(key);
+            iter.remove();
+            key = null;
+          }
         } catch (OutOfMemoryError e) {
           // we can run out of memory if we have too many threads
           // log the event and sleep for a minute and give 
           // some thread(s) a chance to finish
-          LOG.warn(getName() + " out of memory, sleeping...", e);          
+          closeCurrentConnection(key, e);
+          cleanupConnections(true);
+          try { Thread.sleep(60000); } catch (Exception ie) {}
+        } catch (Exception e) {
+          closeCurrentConnection(key, e);
+        }
+        cleanupConnections(false);
+      }
+      LOG.info("Stopping " + this.getName());
+
+      try {
+        if (acceptChannel != null)
+          acceptChannel.close();
+        if (selector != null)
+          selector.close();
+      } catch (IOException e) { }
+
+      selector= null;
+      acceptChannel= null;
+      connectionList = null;
+    }
+
+    private void closeCurrentConnection(SelectionKey key, Throwable e) {
+      if (running) {
+        LOG.warn("selector: " + e);
+        e.printStackTrace();
+      }
+      if (key != null) {
+        Connection c = (Connection)key.attachment();
+        if (c != null) {
+          synchronized (connectionList) {
+            if (connectionList.remove(c))
+              numConnections--;
+          }
           try {
-            acceptedSock.close();
-            Thread.sleep(60000);
-          } catch (InterruptedException ie) { // ignore interrupts
-          } catch (IOException ioe) { // ignore IOexceptions
-          }          
+            LOG.info(getName() + ": disconnecting client " + c.getHostAddress());
+            c.close();
+          } catch (Exception ex) {}
+          c = null;
         }
-        catch (Exception e) {           // log all other exceptions
-          LOG.info(getName() + " caught: " + e, e);
-        }        
       }
+    }
+
+    void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
+      Connection c = null;
+      ServerSocketChannel server = (ServerSocketChannel) key.channel();
+      SocketChannel channel = server.accept();
+      channel.configureBlocking(false);
+      SelectionKey readKey = channel.register(selector, SelectionKey.OP_READ);
+      c = new Connection(readKey, channel, System.currentTimeMillis());
+      readKey.attach(c);
+      synchronized (connectionList) {
+        connectionList.add(numConnections, c);
+        numConnections++;
+      }
+      LOG.info("Server connection on port " + port + " from " + 
+                c.getHostAddress() +
+                ": starting. Number of active connections: " + numConnections);
+    }
+
+    void doRead(SelectionKey key) {
+      int count = 0;
+      if (!key.isValid() || !key.isReadable())
+        return;
+      Connection c = (Connection)key.attachment();
+      if (c == null) {
+        return;  
+      }
+      c.setLastContact(System.currentTimeMillis());
+      
       try {
-        socket.close();
-      } catch (IOException e) {
-        LOG.info(getName() + ": e=" + e);
+        count = c.readAndProcess();
+      } catch (Exception e) {
+        LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count);
+        e.printStackTrace();
+        count = -1; //so that the (count < 0) block is executed
       }
-      LOG.info(getName() + ": exiting");
+      if (count < 0) {
+        synchronized (connectionList) {
+          if (connectionList.remove(c))
+            numConnections--;
+        }
+        try {
+          LOG.info(getName() + ": disconnecting client " + 
+                  c.getHostAddress() + ". Number of active connections: "+
+                  numConnections);
+          c.close();
+        } catch (Exception e) {}
+        c = null;
+      }
+      else {
+        c.setLastContact(System.currentTimeMillis());
+      }
+    }   
+
+    void doStop()
+    {
+        selector.wakeup();
+        Thread.yield();
     }
   }
 
   /** Reads calls from a connection and queues them for handling. */
-  private class Connection extends Thread {
-    private Socket socket;
-    private DataInputStream in;
+  private class Connection {
+    private SocketChannel channel;
+    private SelectionKey key;
+    private ByteBuffer data;
+    private ByteBuffer dataLengthBuffer;
     private DataOutputStream out;
+    private SocketChannelOutputStream channelOut;
+    private long lastContact;
+    private int dataLength;
+    private Socket socket;
 
-    public Connection(Socket socket) throws IOException {
-      this.socket = socket;
-      socket.setSoTimeout(timeout);
-      this.in = new DataInputStream
-        (new BufferedInputStream(socket.getInputStream()));
+    public Connection(SelectionKey key, SocketChannel channel, 
+    long lastContact) {
+      this.key = key;
+      this.channel = channel;
+      this.lastContact = lastContact;
+      this.data = null;
+      this.dataLengthBuffer = null;
+      this.socket = channel.socket();
       this.out = new DataOutputStream
-        (new BufferedOutputStream(socket.getOutputStream()));
-      this.setDaemon(true);
-      this.setName("Server connection on port " + port + " from "
-                   + socket.getInetAddress().getHostAddress());
+        (new BufferedOutputStream(
+         this.channelOut = new SocketChannelOutputStream(channel, 4096)));
+    }   
+
+    public String getHostAddress() {
+      return socket.getInetAddress().getHostAddress();
     }
 
-    public void run() {
-      LOG.info(getName() + ": starting");
-      SERVER.set(Server.this);
-      try {
-        while (running) {
-          int id;
-          try {
-            id = in.readInt();                    // try to read an id
-          } catch (SocketTimeoutException e) {
-            continue;
-          }
-        
-          if (LOG.isDebugEnabled())
-            LOG.debug(getName() + " got #" + id);
-        
-          Writable param = makeParam();           // read param
-          param.readFields(in);        
+    public void setLastContact(long lastContact) {
+      this.lastContact = lastContact;
+    }
+
+    public long getLastContact() {
+      return lastContact;
+    }
+
+    private boolean timedOut() {
+      if(System.currentTimeMillis() -  lastContact > maxIdleTime)
+        return true;
+      return false;
+    }
+
+    private boolean timedOut(long currentTime) {
+        if(currentTime -  lastContact > maxIdleTime)
+          return true;
+        return false;
+    }
+
+    public int readAndProcess() throws IOException, InterruptedException {
+      int count = -1;
+      if (dataLengthBuffer == null)
+        dataLengthBuffer = ByteBuffer.allocateDirect(4);
+      if (dataLengthBuffer.remaining() > 0) {
+        count = channel.read(dataLengthBuffer);
+        if (count < 0) return count;
+        if (dataLengthBuffer.remaining() == 0) {
+          dataLengthBuffer.flip(); 
+          dataLength = dataLengthBuffer.getInt();
+          data = ByteBuffer.allocateDirect(dataLength);
+        }
+        //return count;
+      }
+      count = channel.read(data);
+      if (data.remaining() == 0) {
+        data.flip();
+        processData();
+        data = dataLengthBuffer = null; 
+      }
+      return count;
+    }
+
+    private void processData() throws  IOException, InterruptedException {
+      byte[] bytes = new byte[dataLength];
+      data.get(bytes);
+      DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
+      int id = dis.readInt();                    // try to read an id
         
-          Call call = new Call(id, param, this);
+      if (LOG.isDebugEnabled())
+        LOG.debug(" got #" + id);
+            
+      Writable param = makeParam();           // read param
+      param.readFields(dis);        
         
-          synchronized (callQueue) {
-            callQueue.addLast(call);              // queue the call
-            callQueue.notify();                   // wake up a waiting handler
-          }
+      Call call = new Call(id, param, this);
+      synchronized (callQueue) {
+        callQueue.addLast(call);              // queue the call
+        callQueue.notify();                   // wake up a waiting handler
+      }
         
-          while (running && callQueue.size() >= maxQueuedCalls) {
-            synchronized (callDequeued) {         // queue is full
-              callDequeued.wait(timeout);         // wait for a dequeue
-            }
-          }
+      while (running && callQueue.size() >= maxQueuedCalls) {
+        synchronized (callDequeued) {         // queue is full
+          callDequeued.wait(timeout);         // wait for a dequeue
         }
-      } catch (EOFException eof) {
-          // This is what happens on linux when the other side shuts down
-      } catch (SocketException eof) {
-          // This is what happens on Win32 when the other side shuts down
-      } catch (Exception e) {
-        LOG.info(getName() + " caught: " + e, e);
-      } finally {
-        try {
-          socket.close();
-        } catch (IOException e) {}
-        LOG.info(getName() + ": exiting");
       }
     }
 
+    private void close() throws IOException {
+      data = null;
+      dataLengthBuffer = null;
+      if (!channel.isOpen())
+        return;
+      try {socket.shutdownOutput();} catch(Exception e) {}
+      try {out.close();} catch(Exception e) {}
+      try {channelOut.destroy();} catch(Exception e) {}
+      if (channel.isOpen()) {
+        try {channel.close();} catch(Exception e) {}
+      }
+      try {socket.close();} catch(Exception e) {}
+      try {key.cancel();} catch(Exception e) {}
+      key = null;
+    }
   }
 
   /** Handles queued calls . */
@@ -237,15 +473,24 @@
             
           DataOutputStream out = call.connection.out;
           synchronized (out) {
-            out.writeInt(call.id);                // write call id
-            out.writeBoolean(error!=null);        // write error flag
-            if (error == null) {
-              value.write(out);
-            } else {
-              WritableUtils.writeString(out, errorClass);
-              WritableUtils.writeString(out, error);
+            try {
+              out.writeInt(call.id);                // write call id
+              out.writeBoolean(error!=null);        // write error flag
+              if (error == null) {
+                value.write(out);
+              } else {
+                WritableUtils.writeString(out, errorClass);
+                WritableUtils.writeString(out, error);
+              }
+              out.flush();
+            } catch (Exception e) {
+              e.printStackTrace();
+              synchronized (connectionList) {
+                if (connectionList.remove(call.connection))
+                  numConnections--;
+              }
+              call.connection.close();
             }
-            out.flush();
           }
 
         } catch (Exception e) {
@@ -275,7 +520,10 @@
     this.paramClass = paramClass;
     this.handlerCount = handlerCount;
     this.maxQueuedCalls = handlerCount;
-    this.timeout = conf.getInt("ipc.client.timeout",10000); 
+    this.timeout = conf.getInt("ipc.client.timeout",10000);
+    this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000);
+    this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
+    this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
   }
 
   /** Sets the timeout used for network i/o. */
@@ -283,7 +531,7 @@
 
   /** Starts the service.  Must be called before any calls will be handled. */
   public synchronized void start() throws IOException {
-    Listener listener = new Listener();
+    listener = new Listener();
     listener.start();
     
     for (int i = 0; i < handlerCount; i++) {
@@ -298,6 +546,7 @@
   public synchronized void stop() {
     LOG.info("Stopping server on " + port);
     running = false;
+    listener.doStop();
     try {
       Thread.sleep(timeout);     //  inexactly wait for pending requests to finish
     } catch (InterruptedException e) {}