You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/06/15 00:41:25 UTC
svn commit: r414404 - in /lucene/hadoop/trunk: CHANGES.txt
src/java/org/apache/hadoop/ipc/Client.java
src/java/org/apache/hadoop/ipc/Server.java
Author: cutting
Date: Wed Jun 14 15:41:25 2006
New Revision: 414404
URL: http://svn.apache.org/viewvc?rev=414404&view=rev
Log:
Reverting patch for HADOOP-210, which was causing problems.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=414404&r1=414403&r2=414404&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Jun 14 15:41:25 2006
@@ -3,11 +3,7 @@
Trunk (unreleased changes)
- 1. HADOOP-210. Change RPC client and server so that server uses a
- selector, and a thread per connection is no longer required. This
- should permit larger clusters. (Devaraj Das via cutting)
-
- 2. HADOOP-298. Improved progress reports for CopyFiles utility, the
+ 1. HADOOP-298. Improved progress reports for CopyFiles utility, the
distributed file copier. (omalley via cutting)
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=414404&r1=414403&r2=414404&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java Wed Jun 14 15:41:25 2006
@@ -38,7 +38,6 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.UTF8;
-import org.apache.hadoop.io.DataOutputBuffer;
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
@@ -197,15 +196,8 @@
LOG.debug(getName() + " sending #" + call.id);
try {
writingCall = call;
- DataOutputBuffer d = new DataOutputBuffer(); //for serializing the
- //data to be written
- d.writeInt(call.id);
- call.param.write(d);
- byte[] data = d.getData();
- int dataLength = d.getLength();
-
- out.writeInt(dataLength); //first put the data length
- out.write(data, 0, dataLength);//write the data
+ out.writeInt(call.id);
+ call.param.write(out);
out.flush();
} finally {
writingCall = null;
@@ -216,7 +208,7 @@
if (error)
close(); // close on error
}
- }
+ }
/** Close the connection and remove it from the pool. */
public void close() {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=414404&r1=414403&r2=414404&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Wed Jun 14 15:41:25 2006
@@ -20,23 +20,17 @@
import java.io.EOFException;
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
import java.io.StringWriter;
import java.io.PrintWriter;
-import java.io.ByteArrayInputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.nio.BufferUnderflowException;
-
-import java.net.InetSocketAddress;
import java.net.Socket;
+import java.net.ServerSocket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
import java.util.LinkedList;
-import java.util.Iterator;
-import java.util.Random;
import org.apache.commons.logging.*;
@@ -44,8 +38,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
-
-import org.mortbay.http.nio.SocketChannelOutputStream;
+import org.apache.hadoop.io.UTF8;
/** An abstract IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
@@ -72,16 +65,6 @@
private int handlerCount; // number of handler threads
private int maxQueuedCalls; // max number of queued calls
private Class paramClass; // class of call parameters
- private int maxIdleTime; // the maximum idle time after
- // which a client may be disconnected
- private int thresholdIdleConnections; // the number of idle connections
- // after which we will start
- // cleaning up idle
- // connections
- int maxConnectionsToNuke; // the max number of
- // connections to nuke
- //during a cleanup
-
private Configuration conf;
private int timeout;
@@ -90,12 +73,6 @@
private LinkedList callQueue = new LinkedList(); // queued calls
private Object callDequeued = new Object(); // used by wait/notify
- private InetSocketAddress address; //the address we bind at
- private ServerSocketChannel acceptChannel = null; //the (main) accept channel
- private Selector selector = null; //the selector that we use for the server
- private Listener listener;
- private int numConnections = 0;
-
/** A call queued for handling. */
private static class Call {
private int id; // the client's call id
@@ -109,300 +86,113 @@
}
}
- /** Listens on the socket. Creates jobs for the handler threads*/
+ /** Listens on the socket, starting new connection threads. */
private class Listener extends Thread {
-
- private LinkedList connectionList = new LinkedList(); //maintain a list
- //of client connectionss
- private Random rand = new Random();
- private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
- //-tion (for idle connections) ran
- private int cleanupInterval = 10000; //the minimum interval between
- //two cleanup runs
-
- public Listener() {
- address = new InetSocketAddress(port);
+ private ServerSocket socket;
+
+ public Listener() throws IOException {
+ this.socket = new ServerSocket(port);
+ socket.setSoTimeout(timeout);
this.setDaemon(true);
- }
- /** cleanup connections from connectionList. Choose a random range
- * to scan and also have a limit on the number of the connections
- * that will be cleanedup per run. The criteria for cleanup is the time
- * for which the connection was idle. If 'force' is true then all
- * connections will be looked at for the cleanup.
- */
- private void cleanupConnections(boolean force) {
- if (force || numConnections > thresholdIdleConnections) {
- long currentTime = System.currentTimeMillis();
- if (!force && (int)(currentTime - lastCleanupRunTime) < cleanupInterval) {
- return;
- }
- int start = 0;
- int end = numConnections - 1;
- if (!force) {
- start = rand.nextInt() % numConnections;
- end = rand.nextInt() % numConnections;
- int temp;
- if (end < start) {
- temp = start;
- start = end;
- end = temp;
- }
- }
- int i = start;
- int numNuked = 0;
- while (i <= end) {
- Connection c = (Connection)connectionList.get(i);
- if (c.timedOut(currentTime)) {
- connectionList.remove(i);
- try {
- LOG.info(getName() + ": disconnecting client " + c.getHostAddress());
- c.close();
- } catch (Exception e) {}
- numNuked++;
- end--;
- if (!force && numNuked == maxConnectionsToNuke) break;
- }
- else i++;
- }
- lastCleanupRunTime = System.currentTimeMillis();
- }
+ this.setName("Server listener on port " + port);
}
public void run() {
- SERVER.set(Server.this);
-
- try {
- // Create a new server socket and set to non blocking mode
- acceptChannel = ServerSocketChannel.open();
- acceptChannel.configureBlocking(false);
-
- // Bind the server socket to the local host and port
- acceptChannel.socket().bind(address);
-
- // create a selector;
- selector= Selector.open();
-
- // Register accepts on the server socket with the selector.
- acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
- this.setName("Server listener on port " + port);
-
- LOG.info(getName() + ": starting");
-
- while (running) {
- SelectionKey key = null;
+ LOG.info(getName() + ": starting");
+ while (running) {
+ Socket acceptedSock = null;
+ try {
+ acceptedSock = socket.accept();
+ new Connection(acceptedSock).start(); // start a new connection
+ } catch (SocketTimeoutException e) { // ignore timeouts
+ } catch (OutOfMemoryError e) {
+ // we can run out of memory if we have too many threads
+ // log the event and sleep for a minute and give
+ // some thread(s) a chance to finish
+ LOG.warn(getName() + " out of memory, sleeping...", e);
try {
- selector.select(timeout);
- Iterator iter = selector.selectedKeys().iterator();
-
- while (iter.hasNext()) {
- key = (SelectionKey)iter.next();
- if (key.isAcceptable())
- doAccept(key);
- else if (key.isReadable())
- doRead(key);
- iter.remove();
- key = null;
- }
- } catch (OutOfMemoryError e) {
- closeCurrentConnection(key, e);
- cleanupConnections(true);
+ acceptedSock.close();
Thread.sleep(60000);
- } catch (Exception e) {
- closeCurrentConnection(key, e);
- }
- cleanupConnections(false);
- }
- } catch (Exception e) {
- LOG.fatal("selector",e);
- }
- LOG.info("Stopping " + this.getName());
-
- try {
- if (acceptChannel != null)
- acceptChannel.close();
- if (selector != null)
- selector.close();
- } catch (IOException e) { }
-
- selector= null;
- acceptChannel= null;
- connectionList = null;
- }
-
- private void closeCurrentConnection(SelectionKey key, Throwable e) {
- if (running) {
- LOG.warn("selector: " + e);
- e.printStackTrace();
- }
- if (key != null) {
- Connection c = (Connection)key.attachment();
- if (c != null) {
- connectionList.remove(c);
- try {
- LOG.info(getName() + ": disconnecting client " + c.getHostAddress());
- c.close();
- } catch (Exception ex) {}
+ } catch (InterruptedException ie) { // ignore interrupts
+ } catch (IOException ioe) { // ignore IOexceptions
+ }
}
+ catch (Exception e) { // log all other exceptions
+ LOG.info(getName() + " caught: " + e, e);
+ }
}
- }
-
- void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
- Connection c = null;
- ServerSocketChannel server = (ServerSocketChannel) key.channel();
- SocketChannel channel = server.accept();
- channel.configureBlocking(false);
- SelectionKey readKey = channel.register(selector, SelectionKey.OP_READ);
- c = new Connection(readKey, channel, System.currentTimeMillis());
- readKey.attach(c);
- connectionList.addLast(c);
- numConnections++;
- LOG.info("Server connection on port " + port + " from " +
- c.getHostAddress() + ": starting");
- }
-
- void doRead(SelectionKey key) {
- int count = 0;
- if (!key.isValid() || !key.isReadable())
- return;
- Connection c = (Connection)key.attachment();
- if (c == null) {
- return;
- }
- c.setLastContact(System.currentTimeMillis());
-
try {
- count = c.readAndProcess();
- } catch (Exception e) {
- LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count);
- count = -1; //so that the (count < 0) block is executed
+ socket.close();
+ } catch (IOException e) {
+ LOG.info(getName() + ": e=" + e);
}
- if (count < 0) {
- connectionList.remove(c);
- try {
- LOG.info(getName() + ": disconnecting client " + c.getHostAddress());
- c.close();
- } catch (Exception e) {}
- }
- else {
- c.setLastContact(System.currentTimeMillis());
- }
- }
-
- void doStop()
- {
- selector.wakeup();
- Thread.yield();
+ LOG.info(getName() + ": exiting");
}
}
/** Reads calls from a connection and queues them for handling. */
- private class Connection {
- private SocketChannel channel;
- private SelectionKey key;
- private ByteBuffer data;
- private ByteBuffer dataLengthBuffer;
- private DataOutputStream out;
- private long lastContact;
- private int dataLength;
+ private class Connection extends Thread {
private Socket socket;
+ private DataInputStream in;
+ private DataOutputStream out;
- public Connection(SelectionKey key, SocketChannel channel,
- long lastContact) {
- this.key = key;
- this.channel = channel;
- this.lastContact = lastContact;
- this.data = null;
- this.dataLengthBuffer = null;
- this.socket = channel.socket();
+ public Connection(Socket socket) throws IOException {
+ this.socket = socket;
+ socket.setSoTimeout(timeout);
+ this.in = new DataInputStream
+ (new BufferedInputStream(socket.getInputStream()));
this.out = new DataOutputStream
- (new SocketChannelOutputStream(channel, 4096));
- }
-
- public String getHostAddress() {
- return socket.getInetAddress().getHostAddress();
- }
-
- public void setLastContact(long lastContact) {
- this.lastContact = lastContact;
- }
-
- public long getLastContact() {
- return lastContact;
- }
-
- private boolean timedOut() {
- if(System.currentTimeMillis() - lastContact > maxIdleTime)
- return true;
- return false;
- }
-
- private boolean timedOut(long currentTime) {
- if(currentTime - lastContact > timeout)
- return true;
- return false;
- }
-
- public int readAndProcess() throws IOException, InterruptedException {
- int count = -1;
- if (dataLengthBuffer == null)
- dataLengthBuffer = ByteBuffer.allocateDirect(4);
- if (dataLengthBuffer.remaining() > 0) {
- count = channel.read(dataLengthBuffer);
- if (count < 0) return count;
- if (dataLengthBuffer.remaining() == 0) {
- dataLengthBuffer.flip();
- dataLength = dataLengthBuffer.getInt();
- data = ByteBuffer.allocateDirect(dataLength);
- }
- return count;
- }
- count = channel.read(data);
- if (data.remaining() == 0) {
- data.flip();
- processData();
- data = dataLengthBuffer = null;
- }
- return count;
+ (new BufferedOutputStream(socket.getOutputStream()));
+ this.setDaemon(true);
+ this.setName("Server connection on port " + port + " from "
+ + socket.getInetAddress().getHostAddress());
}
- private void processData() throws IOException, InterruptedException {
- byte[] bytes = new byte[dataLength];
- data.get(bytes);
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
- int id = dis.readInt(); // try to read an id
+ public void run() {
+ LOG.info(getName() + ": starting");
+ SERVER.set(Server.this);
+ try {
+ while (running) {
+ int id;
+ try {
+ id = in.readInt(); // try to read an id
+ } catch (SocketTimeoutException e) {
+ continue;
+ }
- if (LOG.isDebugEnabled())
- LOG.debug(" got #" + id);
-
- Writable param = makeParam(); // read param
- param.readFields(dis);
+ if (LOG.isDebugEnabled())
+ LOG.debug(getName() + " got #" + id);
- Call call = new Call(id, param, this);
- synchronized (callQueue) {
- callQueue.addLast(call); // queue the call
- callQueue.notify(); // wake up a waiting handler
- }
+ Writable param = makeParam(); // read param
+ param.readFields(in);
+
+ Call call = new Call(id, param, this);
+
+ synchronized (callQueue) {
+ callQueue.addLast(call); // queue the call
+ callQueue.notify(); // wake up a waiting handler
+ }
- while (running && callQueue.size() >= maxQueuedCalls) {
- synchronized (callDequeued) { // queue is full
- callDequeued.wait(timeout); // wait for a dequeue
+ while (running && callQueue.size() >= maxQueuedCalls) {
+ synchronized (callDequeued) { // queue is full
+ callDequeued.wait(timeout); // wait for a dequeue
+ }
+ }
}
+ } catch (EOFException eof) {
+ // This is what happens on linux when the other side shuts down
+ } catch (SocketException eof) {
+ // This is what happens on Win32 when the other side shuts down
+ } catch (Exception e) {
+ LOG.info(getName() + " caught: " + e, e);
+ } finally {
+ try {
+ socket.close();
+ } catch (IOException e) {}
+ LOG.info(getName() + ": exiting");
}
}
- private void close() throws IOException {
- data = null;
- dataLengthBuffer = null;
- if (!channel.isOpen())
- return;
- socket.shutdownOutput();
- channel.close();
- socket.close();
- channel.close();
- out.close();
- key.cancel();
- numConnections--;
- }
}
/** Handles queued calls . */
@@ -455,6 +245,7 @@
WritableUtils.writeString(out, errorClass);
WritableUtils.writeString(out, error);
}
+ out.flush();
}
} catch (Exception e) {
@@ -484,10 +275,7 @@
this.paramClass = paramClass;
this.handlerCount = handlerCount;
this.maxQueuedCalls = handlerCount;
- this.timeout = conf.getInt("ipc.client.timeout",10000);
- this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000);
- this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
- this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 120000);
+ this.timeout = conf.getInt("ipc.client.timeout",10000);
}
/** Sets the timeout used for network i/o. */
@@ -495,7 +283,7 @@
/** Starts the service. Must be called before any calls will be handled. */
public synchronized void start() throws IOException {
- listener = new Listener();
+ Listener listener = new Listener();
listener.start();
for (int i = 0; i < handlerCount; i++) {
@@ -510,7 +298,6 @@
public synchronized void stop() {
LOG.info("Stopping server on " + port);
running = false;
- listener.doStop();
try {
Thread.sleep(timeout); // inexactly wait for pending requests to finish
} catch (InterruptedException e) {}