You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/06/13 21:18:44 UTC
svn commit: r413958 - in /lucene/hadoop/trunk: CHANGES.txt
src/java/org/apache/hadoop/ipc/Client.java
src/java/org/apache/hadoop/ipc/Server.java
Author: cutting
Date: Tue Jun 13 12:18:43 2006
New Revision: 413958
URL: http://svn.apache.org/viewvc?rev=413958&view=rev
Log:
HADOOP-210. Change RPC server to use a selector instead of a thread per connection. This should make it easier to scale to larger clusters. Contributed by Devaraj Das.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=413958&r1=413957&r2=413958&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Jun 13 12:18:43 2006
@@ -1,6 +1,13 @@
Hadoop Change Log
+Trunk (unreleased changes)
+
+ 1. HADOOP-210. Change RPC client and server so that server uses a
+ selector, and a thread per connection is no longer required. This
+ should permit larger clusters. (Devaraj Das via cutting)
+
+
Release 0.3.2 - 2006-06-09
1. HADOOP-275. Update the streaming contrib module to use log4j for
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=413958&r1=413957&r2=413958&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java Tue Jun 13 12:18:43 2006
@@ -38,6 +38,7 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.DataOutputBuffer;
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
@@ -196,8 +197,15 @@
LOG.debug(getName() + " sending #" + call.id);
try {
writingCall = call;
- out.writeInt(call.id);
- call.param.write(out);
+ DataOutputBuffer d = new DataOutputBuffer(); //for serializing the
+ //data to be written
+ d.writeInt(call.id);
+ call.param.write(d);
+ byte[] data = d.getData();
+ int dataLength = d.getLength();
+
+ out.writeInt(dataLength); //first put the data length
+ out.write(data, 0, dataLength);//write the data
out.flush();
} finally {
writingCall = null;
@@ -208,7 +216,7 @@
if (error)
close(); // close on error
}
- }
+ }
/** Close the connection and remove it from the pool. */
public void close() {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=413958&r1=413957&r2=413958&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Tue Jun 13 12:18:43 2006
@@ -20,17 +20,23 @@
import java.io.EOFException;
import java.io.DataInputStream;
import java.io.DataOutputStream;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
import java.io.StringWriter;
import java.io.PrintWriter;
+import java.io.ByteArrayInputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.BufferUnderflowException;
+
+import java.net.InetSocketAddress;
import java.net.Socket;
-import java.net.ServerSocket;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
import java.util.LinkedList;
+import java.util.Iterator;
+import java.util.Random;
import org.apache.commons.logging.*;
@@ -38,7 +44,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.UTF8;
+
+import org.mortbay.http.nio.SocketChannelOutputStream;
/** An abstract IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
@@ -65,6 +72,16 @@
private int handlerCount; // number of handler threads
private int maxQueuedCalls; // max number of queued calls
private Class paramClass; // class of call parameters
+ private int maxIdleTime; // the maximum idle time after
+ // which a client may be disconnected
+ private int thresholdIdleConnections; // the number of idle connections
+ // after which we will start
+ // cleaning up idle
+ // connections
+ int maxConnectionsToNuke; // the max number of
+ // connections to nuke
+ //during a cleanup
+
private Configuration conf;
private int timeout;
@@ -73,6 +90,12 @@
private LinkedList callQueue = new LinkedList(); // queued calls
private Object callDequeued = new Object(); // used by wait/notify
+ private InetSocketAddress address; //the address we bind at
+ private ServerSocketChannel acceptChannel = null; //the (main) accept channel
+ private Selector selector = null; //the selector that we use for the server
+ private Listener listener;
+ private int numConnections = 0;
+
/** A call queued for handling. */
private static class Call {
private int id; // the client's call id
@@ -86,113 +109,300 @@
}
}
- /** Listens on the socket, starting new connection threads. */
+ /** Listens on the socket. Creates jobs for the handler threads*/
private class Listener extends Thread {
- private ServerSocket socket;
-
- public Listener() throws IOException {
- this.socket = new ServerSocket(port);
- socket.setSoTimeout(timeout);
+
+ private LinkedList connectionList = new LinkedList(); //maintain a list
+ //of client connectionss
+ private Random rand = new Random();
+ private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
+ //-tion (for idle connections) ran
+ private int cleanupInterval = 10000; //the minimum interval between
+ //two cleanup runs
+
+ public Listener() {
+ address = new InetSocketAddress(port);
this.setDaemon(true);
- this.setName("Server listener on port " + port);
+ }
+ /** cleanup connections from connectionList. Choose a random range
+ * to scan and also have a limit on the number of the connections
+ * that will be cleanedup per run. The criteria for cleanup is the time
+ * for which the connection was idle. If 'force' is true then all
+ * connections will be looked at for the cleanup.
+ */
+ private void cleanupConnections(boolean force) {
+ if (force || numConnections > thresholdIdleConnections) {
+ long currentTime = System.currentTimeMillis();
+ if (!force && (int)(currentTime - lastCleanupRunTime) < cleanupInterval) {
+ return;
+ }
+ int start = 0;
+ int end = numConnections - 1;
+ if (!force) {
+ start = rand.nextInt() % numConnections;
+ end = rand.nextInt() % numConnections;
+ int temp;
+ if (end < start) {
+ temp = start;
+ start = end;
+ end = temp;
+ }
+ }
+ int i = start;
+ int numNuked = 0;
+ while (i <= end) {
+ Connection c = (Connection)connectionList.get(i);
+ if (c.timedOut(currentTime)) {
+ connectionList.remove(i);
+ try {
+ LOG.info(getName() + ": disconnecting client " + c.getHostAddress());
+ c.close();
+ } catch (Exception e) {}
+ numNuked++;
+ end--;
+ if (!force && numNuked == maxConnectionsToNuke) break;
+ }
+ else i++;
+ }
+ lastCleanupRunTime = System.currentTimeMillis();
+ }
}
public void run() {
- LOG.info(getName() + ": starting");
- while (running) {
- Socket acceptedSock = null;
- try {
- acceptedSock = socket.accept();
- new Connection(acceptedSock).start(); // start a new connection
- } catch (SocketTimeoutException e) { // ignore timeouts
- } catch (OutOfMemoryError e) {
- // we can run out of memory if we have too many threads
- // log the event and sleep for a minute and give
- // some thread(s) a chance to finish
- LOG.warn(getName() + " out of memory, sleeping...", e);
+ SERVER.set(Server.this);
+
+ try {
+ // Create a new server socket and set to non blocking mode
+ acceptChannel = ServerSocketChannel.open();
+ acceptChannel.configureBlocking(false);
+
+ // Bind the server socket to the local host and port
+ acceptChannel.socket().bind(address);
+
+ // create a selector;
+ selector= Selector.open();
+
+ // Register accepts on the server socket with the selector.
+ acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
+ this.setName("Server listener on port " + port);
+
+ LOG.info(getName() + ": starting");
+
+ while (running) {
+ SelectionKey key = null;
try {
- acceptedSock.close();
+ selector.select(timeout);
+ Iterator iter = selector.selectedKeys().iterator();
+
+ while (iter.hasNext()) {
+ key = (SelectionKey)iter.next();
+ if (key.isAcceptable())
+ doAccept(key);
+ else if (key.isReadable())
+ doRead(key);
+ iter.remove();
+ key = null;
+ }
+ } catch (OutOfMemoryError e) {
+ closeCurrentConnection(key, e);
+ cleanupConnections(true);
Thread.sleep(60000);
- } catch (InterruptedException ie) { // ignore interrupts
- } catch (IOException ioe) { // ignore IOexceptions
- }
+ } catch (Exception e) {
+ closeCurrentConnection(key, e);
+ }
+ cleanupConnections(false);
}
- catch (Exception e) { // log all other exceptions
- LOG.info(getName() + " caught: " + e, e);
- }
+ } catch (Exception e) {
+ LOG.fatal("selector",e);
}
+ LOG.info("Stopping " + this.getName());
+
try {
- socket.close();
- } catch (IOException e) {
- LOG.info(getName() + ": e=" + e);
+ if (acceptChannel != null)
+ acceptChannel.close();
+ if (selector != null)
+ selector.close();
+ } catch (IOException e) { }
+
+ selector= null;
+ acceptChannel= null;
+ connectionList = null;
+ }
+
+ private void closeCurrentConnection(SelectionKey key, Throwable e) {
+ if (running) {
+ LOG.warn("selector: " + e);
+ e.printStackTrace();
}
- LOG.info(getName() + ": exiting");
+ if (key != null) {
+ Connection c = (Connection)key.attachment();
+ if (c != null) {
+ connectionList.remove(c);
+ try {
+ LOG.info(getName() + ": disconnecting client " + c.getHostAddress());
+ c.close();
+ } catch (Exception ex) {}
+ }
+ }
+ }
+
+ void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
+ Connection c = null;
+ ServerSocketChannel server = (ServerSocketChannel) key.channel();
+ SocketChannel channel = server.accept();
+ channel.configureBlocking(false);
+ SelectionKey readKey = channel.register(selector, SelectionKey.OP_READ);
+ c = new Connection(readKey, channel, System.currentTimeMillis());
+ readKey.attach(c);
+ connectionList.addLast(c);
+ numConnections++;
+ LOG.info("Server connection on port " + port + " from " +
+ c.getHostAddress() + ": starting");
+ }
+
+ void doRead(SelectionKey key) {
+ int count = 0;
+ if (!key.isValid() || !key.isReadable())
+ return;
+ Connection c = (Connection)key.attachment();
+ if (c == null) {
+ return;
+ }
+ c.setLastContact(System.currentTimeMillis());
+
+ try {
+ count = c.readAndProcess();
+ } catch (Exception e) {
+ LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count);
+ count = -1; //so that the (count < 0) block is executed
+ }
+ if (count < 0) {
+ connectionList.remove(c);
+ try {
+ LOG.info(getName() + ": disconnecting client " + c.getHostAddress());
+ c.close();
+ } catch (Exception e) {}
+ }
+ else {
+ c.setLastContact(System.currentTimeMillis());
+ }
+ }
+
+ void doStop()
+ {
+ selector.wakeup();
+ Thread.yield();
}
}
/** Reads calls from a connection and queues them for handling. */
- private class Connection extends Thread {
- private Socket socket;
- private DataInputStream in;
+ private class Connection {
+ private SocketChannel channel;
+ private SelectionKey key;
+ private ByteBuffer data;
+ private ByteBuffer dataLengthBuffer;
private DataOutputStream out;
+ private long lastContact;
+ private int dataLength;
+ private Socket socket;
- public Connection(Socket socket) throws IOException {
- this.socket = socket;
- socket.setSoTimeout(timeout);
- this.in = new DataInputStream
- (new BufferedInputStream(socket.getInputStream()));
+ public Connection(SelectionKey key, SocketChannel channel,
+ long lastContact) {
+ this.key = key;
+ this.channel = channel;
+ this.lastContact = lastContact;
+ this.data = null;
+ this.dataLengthBuffer = null;
+ this.socket = channel.socket();
this.out = new DataOutputStream
- (new BufferedOutputStream(socket.getOutputStream()));
- this.setDaemon(true);
- this.setName("Server connection on port " + port + " from "
- + socket.getInetAddress().getHostAddress());
+ (new SocketChannelOutputStream(channel, 4096));
+ }
+
+ public String getHostAddress() {
+ return socket.getInetAddress().getHostAddress();
}
- public void run() {
- LOG.info(getName() + ": starting");
- SERVER.set(Server.this);
- try {
- while (running) {
- int id;
- try {
- id = in.readInt(); // try to read an id
- } catch (SocketTimeoutException e) {
- continue;
- }
-
- if (LOG.isDebugEnabled())
- LOG.debug(getName() + " got #" + id);
-
- Writable param = makeParam(); // read param
- param.readFields(in);
+ public void setLastContact(long lastContact) {
+ this.lastContact = lastContact;
+ }
+
+ public long getLastContact() {
+ return lastContact;
+ }
+
+ private boolean timedOut() {
+ if(System.currentTimeMillis() - lastContact > maxIdleTime)
+ return true;
+ return false;
+ }
+
+ private boolean timedOut(long currentTime) {
+ if(currentTime - lastContact > timeout)
+ return true;
+ return false;
+ }
+
+ public int readAndProcess() throws IOException, InterruptedException {
+ int count = -1;
+ if (dataLengthBuffer == null)
+ dataLengthBuffer = ByteBuffer.allocateDirect(4);
+ if (dataLengthBuffer.remaining() > 0) {
+ count = channel.read(dataLengthBuffer);
+ if (count < 0) return count;
+ if (dataLengthBuffer.remaining() == 0) {
+ dataLengthBuffer.flip();
+ dataLength = dataLengthBuffer.getInt();
+ data = ByteBuffer.allocateDirect(dataLength);
+ }
+ return count;
+ }
+ count = channel.read(data);
+ if (data.remaining() == 0) {
+ data.flip();
+ processData();
+ data = dataLengthBuffer = null;
+ }
+ return count;
+ }
+
+ private void processData() throws IOException, InterruptedException {
+ byte[] bytes = new byte[dataLength];
+ data.get(bytes);
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
+ int id = dis.readInt(); // try to read an id
- Call call = new Call(id, param, this);
+ if (LOG.isDebugEnabled())
+ LOG.debug(" got #" + id);
+
+ Writable param = makeParam(); // read param
+ param.readFields(dis);
- synchronized (callQueue) {
- callQueue.addLast(call); // queue the call
- callQueue.notify(); // wake up a waiting handler
- }
+ Call call = new Call(id, param, this);
+ synchronized (callQueue) {
+ callQueue.addLast(call); // queue the call
+ callQueue.notify(); // wake up a waiting handler
+ }
- while (running && callQueue.size() >= maxQueuedCalls) {
- synchronized (callDequeued) { // queue is full
- callDequeued.wait(timeout); // wait for a dequeue
- }
- }
+ while (running && callQueue.size() >= maxQueuedCalls) {
+ synchronized (callDequeued) { // queue is full
+ callDequeued.wait(timeout); // wait for a dequeue
}
- } catch (EOFException eof) {
- // This is what happens on linux when the other side shuts down
- } catch (SocketException eof) {
- // This is what happens on Win32 when the other side shuts down
- } catch (Exception e) {
- LOG.info(getName() + " caught: " + e, e);
- } finally {
- try {
- socket.close();
- } catch (IOException e) {}
- LOG.info(getName() + ": exiting");
}
}
+ private void close() throws IOException {
+ data = null;
+ dataLengthBuffer = null;
+ if (!channel.isOpen())
+ return;
+ socket.shutdownOutput();
+ channel.close();
+ socket.close();
+ channel.close();
+ out.close();
+ key.cancel();
+ numConnections--;
+ }
}
/** Handles queued calls . */
@@ -245,7 +455,6 @@
WritableUtils.writeString(out, errorClass);
WritableUtils.writeString(out, error);
}
- out.flush();
}
} catch (Exception e) {
@@ -275,7 +484,10 @@
this.paramClass = paramClass;
this.handlerCount = handlerCount;
this.maxQueuedCalls = handlerCount;
- this.timeout = conf.getInt("ipc.client.timeout",10000);
+ this.timeout = conf.getInt("ipc.client.timeout",10000);
+ this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000);
+ this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
+ this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 120000);
}
/** Sets the timeout used for network i/o. */
@@ -283,7 +495,7 @@
/** Starts the service. Must be called before any calls will be handled. */
public synchronized void start() throws IOException {
- Listener listener = new Listener();
+ listener = new Listener();
listener.start();
for (int i = 0; i < handlerCount; i++) {
@@ -298,6 +510,7 @@
public synchronized void stop() {
LOG.info("Stopping server on " + port);
running = false;
+ listener.doStop();
try {
Thread.sleep(timeout); // inexactly wait for pending requests to finish
} catch (InterruptedException e) {}