You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/06/21 20:15:11 UTC
svn commit: r416055 - in /lucene/hadoop/trunk: CHANGES.txt
conf/hadoop-default.xml src/java/org/apache/hadoop/ipc/Client.java
src/java/org/apache/hadoop/ipc/Server.java
Author: cutting
Date: Wed Jun 21 11:15:11 2006
New Revision: 416055
URL: http://svn.apache.org/viewvc?rev=416055&view=rev
Log:
HADOOP-210. Change RPC server to use a selector instead of a thread per connection. Contributed by Devaraj Das.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/conf/hadoop-default.xml
lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=416055&r1=416054&r2=416055&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Jun 21 11:15:11 2006
@@ -20,6 +20,12 @@
web ui. Also attempt to log a thread dump of child processes
before they're killed. (omalley via cutting)
+ 6. HADOOP-210. Change RPC server to use a selector instead of a
+ thread per connection. This should make it easier to scale to
+ larger clusters. Note that this incompatibly changes the RPC
+ protocol: clients and servers must both be upgraded to the new
+ version to ensure correct operation. (Devaraj Das via cutting)
+
Release 0.3.2 - 2006-06-09
Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?rev=416055&r1=416054&r2=416055&view=diff
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Wed Jun 21 11:15:11 2006
@@ -332,4 +332,28 @@
<description>Defines the timeout for IPC calls in milliseconds.</description>
</property>
+<property>
+ <name>ipc.client.idlethreshold</name>
+ <value>4000</value>
+ <description>Defines the threshold numner of connections after which
+ connections will be inspected for idleness.
+ </description>
+</property>
+
+<property>
+ <name>ipc.client.maxidletime</name>
+ <value>120000</value>
+ <description>Defines the maximum idle time for a connected client after
+ which it may be disconnected.
+ </description>
+</property>
+
+<property>
+ <name>ipc.client.kill.max</name>
+ <value>10</value>
+ <description>Defines the maximum number of clients to disconnect in one go.
+ </description>
+</property>
+
+
</configuration>
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=416055&r1=416054&r2=416055&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java Wed Jun 21 11:15:11 2006
@@ -38,6 +38,7 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.DataOutputBuffer;
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
@@ -196,8 +197,15 @@
LOG.debug(getName() + " sending #" + call.id);
try {
writingCall = call;
- out.writeInt(call.id);
- call.param.write(out);
+ DataOutputBuffer d = new DataOutputBuffer(); //for serializing the
+ //data to be written
+ d.writeInt(call.id);
+ call.param.write(d);
+ byte[] data = d.getData();
+ int dataLength = d.getLength();
+
+ out.writeInt(dataLength); //first put the data length
+ out.write(data, 0, dataLength);//write the data
out.flush();
} finally {
writingCall = null;
@@ -208,7 +216,7 @@
if (error)
close(); // close on error
}
- }
+ }
/** Close the connection and remove it from the pool. */
public void close() {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=416055&r1=416054&r2=416055&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Wed Jun 21 11:15:11 2006
@@ -20,17 +20,26 @@
import java.io.EOFException;
import java.io.DataInputStream;
import java.io.DataOutputStream;
-import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.StringWriter;
import java.io.PrintWriter;
+import java.io.ByteArrayInputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.BufferUnderflowException;
+
+import java.net.InetSocketAddress;
import java.net.Socket;
-import java.net.ServerSocket;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
+import java.util.Collections;
import java.util.LinkedList;
+import java.util.List;
+import java.util.Iterator;
+import java.util.Random;
import org.apache.commons.logging.*;
@@ -38,7 +47,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.UTF8;
+
+import org.mortbay.http.nio.SocketChannelOutputStream;
/** An abstract IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
@@ -65,6 +75,16 @@
private int handlerCount; // number of handler threads
private int maxQueuedCalls; // max number of queued calls
private Class paramClass; // class of call parameters
+ private int maxIdleTime; // the maximum idle time after
+ // which a client may be disconnected
+ private int thresholdIdleConnections; // the number of idle connections
+ // after which we will start
+ // cleaning up idle
+ // connections
+ int maxConnectionsToNuke; // the max number of
+ // connections to nuke
+ //during a cleanup
+
private Configuration conf;
private int timeout;
@@ -73,6 +93,12 @@
private LinkedList callQueue = new LinkedList(); // queued calls
private Object callDequeued = new Object(); // used by wait/notify
+ private List connectionList =
+ Collections.synchronizedList(new LinkedList()); //maintain a list
+ //of client connectionss
+ private Listener listener;
+ private int numConnections = 0;
+
/** A call queued for handling. */
private static class Call {
private int id; // the client's call id
@@ -86,113 +112,323 @@
}
}
- /** Listens on the socket, starting new connection threads. */
+ /** Listens on the socket. Creates jobs for the handler threads*/
private class Listener extends Thread {
- private ServerSocket socket;
-
+
+ private ServerSocketChannel acceptChannel = null; //the accept channel
+ private Selector selector = null; //the selector that we use for the server
+ private InetSocketAddress address; //the address we bind at
+ private Random rand = new Random();
+ private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
+ //-tion (for idle connections) ran
+ private long cleanupInterval = 10000; //the minimum interval between
+ //two cleanup runs
+
public Listener() throws IOException {
- this.socket = new ServerSocket(port);
- socket.setSoTimeout(timeout);
- this.setDaemon(true);
+ address = new InetSocketAddress(port);
+ // Create a new server socket and set to non blocking mode
+ acceptChannel = ServerSocketChannel.open();
+ acceptChannel.configureBlocking(false);
+
+ // Bind the server socket to the local host and port
+ acceptChannel.socket().bind(address);
+ // create a selector;
+ selector= Selector.open();
+
+ // Register accepts on the server socket with the selector.
+ acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
this.setName("Server listener on port " + port);
+ this.setDaemon(true);
+ }
+ /** cleanup connections from connectionList. Choose a random range
+ * to scan and also have a limit on the number of the connections
+ * that will be cleanedup per run. The criteria for cleanup is the time
+ * for which the connection was idle. If 'force' is true then all
+ * connections will be looked at for the cleanup.
+ */
+ private void cleanupConnections(boolean force) {
+ if (force || numConnections > thresholdIdleConnections) {
+ long currentTime = System.currentTimeMillis();
+ if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
+ return;
+ }
+ int start = 0;
+ int end = numConnections - 1;
+ if (!force) {
+ start = rand.nextInt() % numConnections;
+ end = rand.nextInt() % numConnections;
+ int temp;
+ if (end < start) {
+ temp = start;
+ start = end;
+ end = temp;
+ }
+ }
+ int i = start;
+ int numNuked = 0;
+ while (i <= end) {
+ Connection c;
+ synchronized (connectionList) {
+ try {
+ c = (Connection)connectionList.get(i);
+ } catch (Exception e) {return;}
+ }
+ if (c.timedOut(currentTime)) {
+ synchronized (connectionList) {
+ if (connectionList.remove(c))
+ numConnections--;
+ }
+ try {
+ LOG.info(getName() + ": disconnecting client " + c.getHostAddress());
+ c.close();
+ } catch (Exception e) {}
+ numNuked++;
+ end--;
+ c = null;
+ if (!force && numNuked == maxConnectionsToNuke) break;
+ }
+ else i++;
+ }
+ lastCleanupRunTime = System.currentTimeMillis();
+ }
}
public void run() {
LOG.info(getName() + ": starting");
+ SERVER.set(Server.this);
while (running) {
- Socket acceptedSock = null;
+ SelectionKey key = null;
try {
- acceptedSock = socket.accept();
- new Connection(acceptedSock).start(); // start a new connection
- } catch (SocketTimeoutException e) { // ignore timeouts
+ selector.select();
+ Iterator iter = selector.selectedKeys().iterator();
+
+ while (iter.hasNext()) {
+ key = (SelectionKey)iter.next();
+ if (key.isAcceptable())
+ doAccept(key);
+ else if (key.isReadable())
+ doRead(key);
+ iter.remove();
+ key = null;
+ }
} catch (OutOfMemoryError e) {
// we can run out of memory if we have too many threads
// log the event and sleep for a minute and give
// some thread(s) a chance to finish
- LOG.warn(getName() + " out of memory, sleeping...", e);
+ closeCurrentConnection(key, e);
+ cleanupConnections(true);
+ try { Thread.sleep(60000); } catch (Exception ie) {}
+ } catch (Exception e) {
+ closeCurrentConnection(key, e);
+ }
+ cleanupConnections(false);
+ }
+ LOG.info("Stopping " + this.getName());
+
+ try {
+ if (acceptChannel != null)
+ acceptChannel.close();
+ if (selector != null)
+ selector.close();
+ } catch (IOException e) { }
+
+ selector= null;
+ acceptChannel= null;
+ connectionList = null;
+ }
+
+ private void closeCurrentConnection(SelectionKey key, Throwable e) {
+ if (running) {
+ LOG.warn("selector: " + e);
+ e.printStackTrace();
+ }
+ if (key != null) {
+ Connection c = (Connection)key.attachment();
+ if (c != null) {
+ synchronized (connectionList) {
+ if (connectionList.remove(c))
+ numConnections--;
+ }
try {
- acceptedSock.close();
- Thread.sleep(60000);
- } catch (InterruptedException ie) { // ignore interrupts
- } catch (IOException ioe) { // ignore IOexceptions
- }
+ LOG.info(getName() + ": disconnecting client " + c.getHostAddress());
+ c.close();
+ } catch (Exception ex) {}
+ c = null;
}
- catch (Exception e) { // log all other exceptions
- LOG.info(getName() + " caught: " + e, e);
- }
}
+ }
+
+ void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
+ Connection c = null;
+ ServerSocketChannel server = (ServerSocketChannel) key.channel();
+ SocketChannel channel = server.accept();
+ channel.configureBlocking(false);
+ SelectionKey readKey = channel.register(selector, SelectionKey.OP_READ);
+ c = new Connection(readKey, channel, System.currentTimeMillis());
+ readKey.attach(c);
+ synchronized (connectionList) {
+ connectionList.add(numConnections, c);
+ numConnections++;
+ }
+ LOG.info("Server connection on port " + port + " from " +
+ c.getHostAddress() +
+ ": starting. Number of active connections: " + numConnections);
+ }
+
+ void doRead(SelectionKey key) {
+ int count = 0;
+ if (!key.isValid() || !key.isReadable())
+ return;
+ Connection c = (Connection)key.attachment();
+ if (c == null) {
+ return;
+ }
+ c.setLastContact(System.currentTimeMillis());
+
try {
- socket.close();
- } catch (IOException e) {
- LOG.info(getName() + ": e=" + e);
+ count = c.readAndProcess();
+ } catch (Exception e) {
+ LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count);
+ e.printStackTrace();
+ count = -1; //so that the (count < 0) block is executed
}
- LOG.info(getName() + ": exiting");
+ if (count < 0) {
+ synchronized (connectionList) {
+ if (connectionList.remove(c))
+ numConnections--;
+ }
+ try {
+ LOG.info(getName() + ": disconnecting client " +
+ c.getHostAddress() + ". Number of active connections: "+
+ numConnections);
+ c.close();
+ } catch (Exception e) {}
+ c = null;
+ }
+ else {
+ c.setLastContact(System.currentTimeMillis());
+ }
+ }
+
+ void doStop()
+ {
+ selector.wakeup();
+ Thread.yield();
}
}
/** Reads calls from a connection and queues them for handling. */
- private class Connection extends Thread {
- private Socket socket;
- private DataInputStream in;
+ private class Connection {
+ private SocketChannel channel;
+ private SelectionKey key;
+ private ByteBuffer data;
+ private ByteBuffer dataLengthBuffer;
private DataOutputStream out;
+ private SocketChannelOutputStream channelOut;
+ private long lastContact;
+ private int dataLength;
+ private Socket socket;
- public Connection(Socket socket) throws IOException {
- this.socket = socket;
- socket.setSoTimeout(timeout);
- this.in = new DataInputStream
- (new BufferedInputStream(socket.getInputStream()));
+ public Connection(SelectionKey key, SocketChannel channel,
+ long lastContact) {
+ this.key = key;
+ this.channel = channel;
+ this.lastContact = lastContact;
+ this.data = null;
+ this.dataLengthBuffer = null;
+ this.socket = channel.socket();
this.out = new DataOutputStream
- (new BufferedOutputStream(socket.getOutputStream()));
- this.setDaemon(true);
- this.setName("Server connection on port " + port + " from "
- + socket.getInetAddress().getHostAddress());
+ (new BufferedOutputStream(
+ this.channelOut = new SocketChannelOutputStream(channel, 4096)));
+ }
+
+ public String getHostAddress() {
+ return socket.getInetAddress().getHostAddress();
}
- public void run() {
- LOG.info(getName() + ": starting");
- SERVER.set(Server.this);
- try {
- while (running) {
- int id;
- try {
- id = in.readInt(); // try to read an id
- } catch (SocketTimeoutException e) {
- continue;
- }
-
- if (LOG.isDebugEnabled())
- LOG.debug(getName() + " got #" + id);
-
- Writable param = makeParam(); // read param
- param.readFields(in);
+ public void setLastContact(long lastContact) {
+ this.lastContact = lastContact;
+ }
+
+ public long getLastContact() {
+ return lastContact;
+ }
+
+ private boolean timedOut() {
+ if(System.currentTimeMillis() - lastContact > maxIdleTime)
+ return true;
+ return false;
+ }
+
+ private boolean timedOut(long currentTime) {
+ if(currentTime - lastContact > maxIdleTime)
+ return true;
+ return false;
+ }
+
+ public int readAndProcess() throws IOException, InterruptedException {
+ int count = -1;
+ if (dataLengthBuffer == null)
+ dataLengthBuffer = ByteBuffer.allocateDirect(4);
+ if (dataLengthBuffer.remaining() > 0) {
+ count = channel.read(dataLengthBuffer);
+ if (count < 0) return count;
+ if (dataLengthBuffer.remaining() == 0) {
+ dataLengthBuffer.flip();
+ dataLength = dataLengthBuffer.getInt();
+ data = ByteBuffer.allocateDirect(dataLength);
+ }
+ //return count;
+ }
+ count = channel.read(data);
+ if (data.remaining() == 0) {
+ data.flip();
+ processData();
+ data = dataLengthBuffer = null;
+ }
+ return count;
+ }
+
+ private void processData() throws IOException, InterruptedException {
+ byte[] bytes = new byte[dataLength];
+ data.get(bytes);
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
+ int id = dis.readInt(); // try to read an id
- Call call = new Call(id, param, this);
+ if (LOG.isDebugEnabled())
+ LOG.debug(" got #" + id);
+
+ Writable param = makeParam(); // read param
+ param.readFields(dis);
- synchronized (callQueue) {
- callQueue.addLast(call); // queue the call
- callQueue.notify(); // wake up a waiting handler
- }
+ Call call = new Call(id, param, this);
+ synchronized (callQueue) {
+ callQueue.addLast(call); // queue the call
+ callQueue.notify(); // wake up a waiting handler
+ }
- while (running && callQueue.size() >= maxQueuedCalls) {
- synchronized (callDequeued) { // queue is full
- callDequeued.wait(timeout); // wait for a dequeue
- }
- }
+ while (running && callQueue.size() >= maxQueuedCalls) {
+ synchronized (callDequeued) { // queue is full
+ callDequeued.wait(timeout); // wait for a dequeue
}
- } catch (EOFException eof) {
- // This is what happens on linux when the other side shuts down
- } catch (SocketException eof) {
- // This is what happens on Win32 when the other side shuts down
- } catch (Exception e) {
- LOG.info(getName() + " caught: " + e, e);
- } finally {
- try {
- socket.close();
- } catch (IOException e) {}
- LOG.info(getName() + ": exiting");
}
}
+ private void close() throws IOException {
+ data = null;
+ dataLengthBuffer = null;
+ if (!channel.isOpen())
+ return;
+ try {socket.shutdownOutput();} catch(Exception e) {}
+ try {out.close();} catch(Exception e) {}
+ try {channelOut.destroy();} catch(Exception e) {}
+ if (channel.isOpen()) {
+ try {channel.close();} catch(Exception e) {}
+ }
+ try {socket.close();} catch(Exception e) {}
+ try {key.cancel();} catch(Exception e) {}
+ key = null;
+ }
}
/** Handles queued calls . */
@@ -237,15 +473,24 @@
DataOutputStream out = call.connection.out;
synchronized (out) {
- out.writeInt(call.id); // write call id
- out.writeBoolean(error!=null); // write error flag
- if (error == null) {
- value.write(out);
- } else {
- WritableUtils.writeString(out, errorClass);
- WritableUtils.writeString(out, error);
+ try {
+ out.writeInt(call.id); // write call id
+ out.writeBoolean(error!=null); // write error flag
+ if (error == null) {
+ value.write(out);
+ } else {
+ WritableUtils.writeString(out, errorClass);
+ WritableUtils.writeString(out, error);
+ }
+ out.flush();
+ } catch (Exception e) {
+ e.printStackTrace();
+ synchronized (connectionList) {
+ if (connectionList.remove(call.connection))
+ numConnections--;
+ }
+ call.connection.close();
}
- out.flush();
}
} catch (Exception e) {
@@ -275,7 +520,10 @@
this.paramClass = paramClass;
this.handlerCount = handlerCount;
this.maxQueuedCalls = handlerCount;
- this.timeout = conf.getInt("ipc.client.timeout",10000);
+ this.timeout = conf.getInt("ipc.client.timeout",10000);
+ this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000);
+ this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
+ this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
}
/** Sets the timeout used for network i/o. */
@@ -283,7 +531,7 @@
/** Starts the service. Must be called before any calls will be handled. */
public synchronized void start() throws IOException {
- Listener listener = new Listener();
+ listener = new Listener();
listener.start();
for (int i = 0; i < handlerCount; i++) {
@@ -298,6 +546,7 @@
public synchronized void stop() {
LOG.info("Stopping server on " + port);
running = false;
+ listener.doStop();
try {
Thread.sleep(timeout); // inexactly wait for pending requests to finish
} catch (InterruptedException e) {}