You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2007/12/13 02:04:48 UTC
svn commit: r603795 - in /lucene/hadoop/trunk: CHANGES.txt
src/java/org/apache/hadoop/ipc/Client.java
src/java/org/apache/hadoop/ipc/RPC.java
src/java/org/apache/hadoop/ipc/Server.java
src/test/org/apache/hadoop/ipc/TestRPC.java
Author: dhruba
Date: Wed Dec 12 17:04:47 2007
New Revision: 603795
URL: http://svn.apache.org/viewvc?rev=603795&view=rev
Log:
HADOOP-1841. Prevent slow clients from consuming threads in the NameNode.
(dhruba)
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=603795&r1=603794&r2=603795&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Dec 12 17:04:47 2007
@@ -238,6 +238,9 @@
HADOOP-2359. Remove warning for interruptted exception when closing down
minidfs. (dhruba via omalley)
+ HADOOP-1841. Prevent slow clients from consuming threads in the NameNode.
+ (dhruba)
+
Branch 0.15 (unreleased)
BUG FIXES
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=603795&r1=603794&r2=603795&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java Wed Dec 12 17:04:47 2007
@@ -73,6 +73,7 @@
private int maxRetries; //the max. no. of retries for socket connections
private Thread connectionCullerThread;
private SocketFactory socketFactory; // how to create sockets
+ private boolean simulateError = false; // unit tests
/** A call waiting for a value. */
private class Call {
@@ -286,6 +287,7 @@
} else {
Writable value = (Writable)ReflectionUtils.newInstance(valueClass, conf);
try {
+ waitForEndSimulation();
readingCall = call;
value.readFields(in); // read value
} finally {
@@ -609,4 +611,19 @@
return address.hashCode() ^ System.identityHashCode(ticket);
}
}
+
+ void simulateError(boolean flag) {
+ simulateError = flag;
+ }
+
+ // If errors are being simulated, then wait.
+ private void waitForEndSimulation() {
+ while (simulateError) {
+ try {
+ LOG.info("RPC Client waiting for simulation to end");
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java?rev=603795&r1=603794&r2=603795&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java Wed Dec 12 17:04:47 2007
@@ -30,6 +30,7 @@
import java.io.*;
import java.util.Map;
import java.util.HashMap;
+import java.util.Collection;
import javax.net.SocketFactory;
@@ -168,6 +169,17 @@
CLIENTS.clear();
}
+ /*
+ * remove specified client from the list of clients.
+ */
+ static synchronized void removeClients() {
+ CLIENTS.clear();
+ }
+
+ static synchronized Collection allClients() {
+ return CLIENTS.values();
+ }
+
private static class Invoker implements InvocationHandler {
private InetSocketAddress address;
private UserGroupInformation ticket;
@@ -415,5 +427,4 @@
value = value.substring(0, 55)+"...";
LOG.info(value);
}
-
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=603795&r1=603794&r2=603795&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Wed Dec 12 17:04:47 2007
@@ -23,6 +23,7 @@
import java.io.DataOutputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
@@ -44,12 +45,13 @@
import java.util.Iterator;
import java.util.Random;
-import org.apache.commons.logging.*;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.ipc.SocketChannelOutputStream;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.*;
@@ -145,6 +147,7 @@
private int timeout;
private long maxCallStartAge;
private int maxQueueSize;
+ private int socketSendBufferSize;
volatile private boolean running = true; // true while server runs
private LinkedList<Call> callQueue = new LinkedList<Call>(); // queued calls
@@ -154,6 +157,7 @@
//maintain a list
//of client connections
private Listener listener = null;
+ private Responder responder = null;
private int numConnections = 0;
private Handler[] handlers = null;
@@ -191,17 +195,24 @@
private Writable param; // the parameter passed
private Connection connection; // connection to client
private long receivedTime; // the time received
+ private ByteBuffer response; // the response for this call
public Call(int id, Writable param, Connection connection) {
this.id = id;
this.param = param;
this.connection = connection;
this.receivedTime = System.currentTimeMillis();
+ this.response = null;
}
+ @Override
public String toString() {
return param.toString() + " from " + connection.toString();
}
+
+ public void setResponse(ByteBuffer response) {
+ this.response = response;
+ }
}
/** Listens on the socket. Creates jobs for the handler threads*/
@@ -288,6 +299,7 @@
}
}
+ @Override
public void run() {
LOG.info(getName() + ": starting");
SERVER.set(Server.this);
@@ -428,6 +440,234 @@
}
}
+ // Sends responses of RPC back to clients.
+ private class Responder extends Thread {
+ private Selector writeSelector;
+ private boolean pending; // call waiting to be enqueued
+
+ Responder() throws IOException {
+ this.setName("IPC Server Responder");
+ this.setDaemon(true);
+ writeSelector = Selector.open(); // create a selector
+ pending = false;
+ }
+
+ @Override
+ public void run() {
+ LOG.info(getName() + ": starting");
+ SERVER.set(Server.this);
+ long lastPurgeTime = 0; // last check for old calls.
+
+ while (running) {
+ SelectionKey key = null;
+ try {
+ waitPending(); // If a channel is being registered, wait.
+ writeSelector.select(maxCallStartAge);
+ Iterator iter = writeSelector.selectedKeys().iterator();
+ while (iter.hasNext()) {
+ key = (SelectionKey)iter.next();
+ iter.remove();
+ try {
+ if (key.isValid() && key.isWritable()) {
+ doAsyncWrite(key);
+ }
+ } catch (IOException e) {
+ LOG.info(getName() + ": doAsyncWrite threw exception " + e);
+ key.cancel();
+ }
+ key = null;
+ }
+ long now = System.currentTimeMillis();
+ if (now < lastPurgeTime + maxCallStartAge) {
+ continue;
+ }
+ lastPurgeTime = now;
+ //
+ // If there were some calls that have not been sent out for a
+ // long time, discard them.
+ //
+ LOG.debug("Checking for old call responses.");
+ iter = writeSelector.keys().iterator();
+ while (iter.hasNext()) {
+ key = (SelectionKey)iter.next();
+ try {
+ doPurge(key, now);
+ } catch (IOException e) {
+ LOG.warn("Error in purging old calls " + e);
+ }
+ }
+ } catch (OutOfMemoryError e) {
+ //
+ // we can run out of memory if we have too many threads
+ // log the event and sleep for a minute and give
+ // some thread(s) a chance to finish
+ //
+ LOG.warn("Out of Memory in server select", e);
+ try { Thread.sleep(60000); } catch (Exception ie) {}
+ } catch (Exception e) {
+ LOG.warn("Exception in Responder " + e);
+ }
+ }
+ LOG.info("Stopping " + this.getName());
+ }
+
+ private void doAsyncWrite(SelectionKey key) throws IOException {
+ Call call = (Call)key.attachment();
+ if (call == null) {
+ return;
+ }
+ if (key.channel() != call.connection.channel) {
+ throw new IOException("doAsyncWrite: bad channel");
+ }
+ if (processResponse(call.connection.responseQueue)) {
+ key.cancel(); // remove item from selector.
+ }
+ }
+
+ //
+ // Remove calls that have been pending in the responseQueue
+ // for a long time.
+ //
+ private void doPurge(SelectionKey key, long now) throws IOException {
+ Call call = (Call)key.attachment();
+ if (call == null) {
+ return;
+ }
+ if (key.channel() != call.connection.channel) {
+ LOG.info("doPurge: bad channel");
+ return;
+ }
+ LinkedList<Call> responseQueue = call.connection.responseQueue;
+ synchronized (responseQueue) {
+ Iterator iter = responseQueue.listIterator(0);
+ while (iter.hasNext()) {
+ call = (Call)iter.next();
+ if (now > call.receivedTime + maxCallStartAge) {
+ LOG.info(getName() + ", call " + call +
+ ": response discarded for being too old (" +
+ (now - call.receivedTime) + ")");
+ iter.remove();
+ }
+ }
+
+ // If all the calls for this channel were removed, then
+ // remove this channel from the selector
+ if (responseQueue.size() == 0) {
+ key.cancel();
+ }
+ }
+ }
+
+ // Processes one response. Returns true if there are no more pending
+ // data for this channel.
+ //
+ private boolean processResponse(LinkedList<Call> responseQueue) throws IOException {
+ boolean error = true;
+ boolean done = false; // there is more data for this channel.
+ int numElements = 0;
+ Call call = null;
+ try {
+ synchronized (responseQueue) {
+ //
+ // If there are no items for this channel, then we are done
+ //
+ numElements = responseQueue.size();
+ if (numElements == 0) {
+ error = false;
+ return true; // no more data for this channel.
+ }
+ //
+ // Extract the first call
+ //
+ int numBytes = 0;
+ call = responseQueue.removeFirst();
+ SocketChannel channel = call.connection.channel;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getName() + ": responding to #" + call.id + " from " +
+ call.connection);
+ }
+ //
+ // Send as much data as we can in the non-blocking fashion
+ //
+ numBytes = channel.write(call.response);
+ if (!call.response.hasRemaining()) {
+ if (numElements == 1) { // last call fully processes.
+ done = true; // no more data for this channel.
+ } else {
+ done = false; // more calls pending to be sent.
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getName() + ": responding to #" + call.id + " from " +
+ call.connection + " Wrote " + numBytes + " bytes.");
+ }
+ } else {
+ //
+ // If we were unable to write the entire response out, then
+ // insert in Selector queue.
+ //
+ call.connection.responseQueue.addFirst(call);
+ setPending();
+ try {
+ // Wakeup the thread blocked on select, only then can the call
+ // to channel.register() complete.
+ writeSelector.wakeup();
+ SelectionKey readKey = channel.register(writeSelector,
+ SelectionKey.OP_WRITE);
+ readKey.attach(call);
+ } finally {
+ clearPending();
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getName() + ": responding to #" + call.id + " from " +
+ call.connection + " Wrote partial " + numBytes +
+ " bytes.");
+ }
+ done = false; // this call not fully processed.
+ }
+ error = false; // everything went off well
+ }
+ } finally {
+ if (error && call != null) {
+ LOG.warn(getName()+", call " + call + ": output error");
+ done = true; // error. no more data for this channel.
+ synchronized (connectionList) {
+ if (connectionList.remove(call.connection))
+ numConnections--;
+ }
+ call.connection.close();
+ }
+ }
+ return done;
+ }
+
+ //
+ // Enqueue a response from the application.
+ //
+ void doRespond(Call call) throws IOException {
+ synchronized (call.connection.responseQueue) {
+ call.connection.responseQueue.addLast(call);
+ if (call.connection.responseQueue.size() == 1) {
+ processResponse(call.connection.responseQueue);
+ }
+ }
+ }
+
+ private synchronized void setPending() { // call waiting to be enqueued.
+ pending = true;
+ }
+
+ private synchronized void clearPending() { // call done enqueueing.
+ pending = false;
+ notify();
+ }
+
+ private synchronized void waitPending() throws InterruptedException {
+ while (pending) {
+ wait();
+ }
+ }
+ }
+
/** Reads calls from a connection and queues them for handling. */
private class Connection {
private boolean versionRead = false; //if initial signature and
@@ -438,8 +678,7 @@
private SelectionKey key;
private ByteBuffer data;
private ByteBuffer dataLengthBuffer;
- private DataOutputStream out;
- private SocketChannelOutputStream channelOut;
+ private LinkedList<Call> responseQueue;
private long lastContact;
private int dataLength;
private Socket socket;
@@ -457,9 +696,6 @@
this.data = null;
this.dataLengthBuffer = ByteBuffer.allocate(4);
this.socket = channel.socket();
- this.out = new DataOutputStream
- (new BufferedOutputStream(
- this.channelOut = new SocketChannelOutputStream(channel)));
InetAddress addr = socket.getInetAddress();
if (addr == null) {
this.hostAddress = "*Unknown*";
@@ -467,8 +703,18 @@
this.hostAddress = addr.getHostAddress();
}
this.remotePort = socket.getPort();
+ this.responseQueue = new LinkedList<Call>();
+ if (socketSendBufferSize != 0) {
+ try {
+ socket.setSendBufferSize(socketSendBufferSize);
+ } catch (IOException e) {
+ LOG.warn("Connection: unable to set socket send buffer size to " +
+ socketSendBufferSize);
+ }
+ }
}
+ @Override
public String toString() {
return getHostAddress() + ":" + remotePort;
}
@@ -516,7 +762,9 @@
if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
//Warning is ok since this is not supposed to happen.
LOG.warn("Incorrect header or version mismatch from " +
- hostAddress + ":" + remotePort);
+ hostAddress + ":" + remotePort +
+ " got version " + version +
+ " expected version " + CURRENT_VERSION);
return -1;
}
dataLengthBuffer.clear();
@@ -589,8 +837,6 @@
if (!channel.isOpen())
return;
try {socket.shutdownOutput();} catch(Exception e) {}
- try {out.close();} catch(Exception e) {}
- try {channelOut.destroy();} catch(Exception e) {}
if (channel.isOpen()) {
try {channel.close();} catch(Exception e) {}
}
@@ -607,9 +853,11 @@
this.setName("IPC Server handler "+ instanceNumber + " on " + port);
}
+ @Override
public void run() {
LOG.info(getName() + ": starting");
SERVER.set(Server.this);
+ ByteArrayOutputStream buf = new ByteArrayOutputStream(10240);
while (running) {
try {
Call call;
@@ -648,28 +896,20 @@
error = StringUtils.stringifyException(e);
}
CurCall.set(null);
-
- DataOutputStream out = call.connection.out;
- synchronized (out) {
- try {
- out.writeInt(call.id); // write call id
- out.writeBoolean(error!=null); // write error flag
- if (error == null) {
- value.write(out);
- } else {
- WritableUtils.writeString(out, errorClass);
- WritableUtils.writeString(out, error);
- }
- out.flush();
- } catch (Exception e) {
- LOG.warn(getName()+", call "+call+": output error", e);
- synchronized (connectionList) {
- if (connectionList.remove(call.connection))
- numConnections--;
- }
- call.connection.close();
- }
+
+ buf.reset();
+ DataOutputStream out = new DataOutputStream(buf);
+ out.writeInt(call.id); // write call id
+ out.writeBoolean(error != null); // write error flag
+
+ if (error == null) {
+ value.write(out);
+ } else {
+ WritableUtils.writeString(out, errorClass);
+ WritableUtils.writeString(out, error);
}
+ call.setResponse(ByteBuffer.wrap(buf.toByteArray()));
+ responder.doRespond(call);
} catch (InterruptedException e) {
if (running) { // unexpected -- log it
LOG.info(getName() + " caught: " + e, e);
@@ -695,6 +935,7 @@
this.paramClass = paramClass;
this.handlerCount = handlerCount;
this.timeout = conf.getInt("ipc.client.timeout", 10000);
+ this.socketSendBufferSize = 0;
maxCallStartAge = (long) (timeout * MAX_CALL_QUEUE_TIME);
maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000);
@@ -704,13 +945,20 @@
// Start the listener here and let it bind to the port
listener = new Listener();
this.port = listener.getAddress().getPort();
+
+ // Create the responder here
+ responder = new Responder();
}
/** Sets the timeout used for network i/o. */
public void setTimeout(int timeout) { this.timeout = timeout; }
+ /** Sets the socket buffer size used for responding to RPCs */
+ public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
+
/** Starts the service. Must be called before any calls will be handled. */
public synchronized void start() throws IOException {
+ responder.start();
listener.start();
handlers = new Handler[handlerCount];
@@ -733,6 +981,7 @@
}
listener.interrupt();
listener.doStop();
+ responder.interrupt();
notifyAll();
}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java?rev=603795&r1=603794&r2=603795&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java Wed Dec 12 17:04:47 2007
@@ -25,6 +25,8 @@
import junit.framework.TestCase;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
import org.apache.commons.logging.*;
@@ -43,6 +45,9 @@
private static Configuration conf = new Configuration();
+ int datasize = 1024*100;
+ int numThreads = 50;
+
public TestRPC(String name) { super(name); }
public interface TestProtocol extends VersionedProtocol {
@@ -56,6 +61,7 @@
int add(int[] values) throws IOException;
int error() throws IOException;
void testServerGet() throws IOException;
+ int[] exchange(int[] values) throws IOException;
}
public class TestImpl implements TestProtocol {
@@ -95,8 +101,117 @@
}
}
+ public int[] exchange(int[] values) {
+ int sum = 0;
+ for (int i = 0; i < values.length; i++) {
+ values[i] = i;
+ }
+ return values;
+ }
+ }
+
+ //
+ // an object that does a bunch of transactions
+ //
+ static class Transactions implements Runnable {
+ int datasize;
+ TestProtocol proxy;
+
+ Transactions(TestProtocol proxy, int datasize) {
+ this.proxy = proxy;
+ this.datasize = datasize;
+ }
+
+ // do two RPC that transfers data.
+ public void run() {
+ int[] indata = new int[datasize];
+ int[] outdata = null;
+ int val = 0;
+ try {
+ outdata = proxy.exchange(indata);
+ val = proxy.add(1,2);
+ } catch (IOException e) {
+ assertTrue("Exception from RPC exchange() " + e, false);
+ }
+ assertEquals(indata.length, outdata.length);
+ assertEquals(val, 3);
+ for (int i = 0; i < outdata.length; i++) {
+ assertEquals(outdata[i], i);
+ }
+ }
+ }
+
+ //
+ // A class that does an RPC but does not read its response.
+ //
+ static class SlowRPC implements Runnable {
+ private TestProtocol proxy;
+ private volatile boolean done;
+
+ SlowRPC(TestProtocol proxy) {
+ this.proxy = proxy;
+ done = false;
+ }
+
+ boolean isDone() {
+ return done;
+ }
+
+ public void run() {
+ try {
+ proxy.ping(); // this would hang until simulateError is false
+ done = true;
+ } catch (IOException e) {
+ assertTrue("SlowRPC ping exception " + e, false);
+ }
+ }
+ }
+
+ public void testSlowRpc() throws Exception {
+ System.out.println("Testing Slow RPC");
+ Server server = RPC.getServer(new TestImpl(), ADDRESS, 0, conf);
+ server.start();
+
+ InetSocketAddress addr = server.getListenerAddress();
+
+ // create a client and make an RPC that does not read its response
+ //
+ TestProtocol proxy1 =
+ (TestProtocol)RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf);
+ Collection collection = RPC.allClients();
+ assertTrue("There should be only one client.", collection.size() == 1);
+ Iterator iter = collection.iterator();
+ Client client = (Client) iter.next();
+
+ client.simulateError(true);
+ RPC.removeClients();
+ SlowRPC slowrpc = new SlowRPC(proxy1);
+ Thread thread = new Thread(slowrpc, "SlowRPC");
+ thread.start();
+ assertTrue("Slow RPC should not have finished1.", !slowrpc.isDone());
+
+ // create another client and make another RPC to the same server. This
+ // should complete even though the first one is still hanging.
+ //
+ TestProtocol proxy2 =
+ (TestProtocol)RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf);
+ proxy2.ping();
+
+ // verify that the first RPC is still stuck
+ assertTrue("Slow RPC should not have finished2.", !slowrpc.isDone());
+
+ // Make the first RPC process its response.
+ client.simulateError(false);
+ while (!slowrpc.isDone()) {
+ System.out.println("Waiting for slow RPC to get done.");
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {}
+ }
+ server.stop();
}
+
public void testCalls() throws Exception {
Server server = RPC.getServer(new TestImpl(), ADDRESS, 0, conf);
server.start();
@@ -142,6 +257,26 @@
proxy.testServerGet();
+ // create multiple threads and make them do large data transfers
+ System.out.println("Starting multi-threaded RPC test...");
+ server.setSocketSendBufSize(1024);
+ Thread threadId[] = new Thread[numThreads];
+ for (int i = 0; i < numThreads; i++) {
+ Transactions trans = new Transactions(proxy, datasize);
+ threadId[i] = new Thread(trans, "TransactionThread-" + i);
+ threadId[i].start();
+ }
+
+ // wait for all transactions to get over
+ System.out.println("Waiting for all threads to finish RPCs...");
+ for (int i = 0; i < numThreads; i++) {
+ try {
+ threadId[i].join();
+ } catch (InterruptedException e) {
+ i--; // retry
+ }
+ }
+
// try some multi-calls
Method echo =
TestProtocol.class.getMethod("echo", new Class[] { String.class });
@@ -161,5 +296,4 @@
new TestRPC("test").testCalls();
}
-
}