You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2008/03/20 00:05:20 UTC
svn commit: r639057 - in /hadoop/core/trunk: CHANGES.txt
src/java/org/apache/hadoop/ipc/Client.java
src/java/org/apache/hadoop/ipc/Server.java
Author: dhruba
Date: Wed Mar 19 16:05:09 2008
New Revision: 639057
URL: http://svn.apache.org/viewvc?rev=639057&view=rev
Log:
HADOOP-2910. Throttle IPC Client/Server during bursts of
requests or server slowdown. (Hairong Kuang via dhruba)
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java
hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=639057&r1=639056&r2=639057&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Mar 19 16:05:09 2008
@@ -111,6 +111,9 @@
HADOOP-2239. Add HsftpFileSystem to permit transferring files over ssl.
(cdouglas)
+ HADOOP-2910. Throttle IPC Client/Server during bursts of
+ requests or server slowdown. (Hairong Kuang via dhruba)
+
OPTIMIZATIONS
HADOOP-2790. Fixed inefficient method hasSpeculativeTask by removing
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=639057&r1=639056&r2=639057&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java Wed Mar 19 16:05:09 2008
@@ -171,7 +171,7 @@
try {
this.socket = socketFactory.createSocket();
this.socket.setTcpNoDelay(tcpNoDelay);
- this.socket.connect(remoteId.getAddress(), FSConstants.READ_TIMEOUT);
+ this.socket.connect(remoteId.getAddress());
break;
} catch (IOException ie) { //SocketTimeoutException is also caught
if (failures == maxRetries) {
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=639057&r1=639056&r2=639057&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java Wed Mar 19 16:05:09 2008
@@ -45,6 +45,8 @@
import java.util.List;
import java.util.Iterator;
import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -155,7 +157,7 @@
private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
volatile private boolean running = true; // true while server runs
- private LinkedList<Call> callQueue = new LinkedList<Call>(); // queued calls
+ private BlockingQueue<Call> callQueue; // queued calls
private List<Connection> connectionList =
Collections.synchronizedList(new LinkedList<Connection>());
@@ -330,6 +332,11 @@
closeCurrentConnection(key, e);
cleanupConnections(true);
try { Thread.sleep(60000); } catch (Exception ie) {}
+ } catch (InterruptedException e) {
+ if (running) { // unexpected -- log it
+ LOG.info(getName() + " caught: " +
+ StringUtils.stringifyException(e));
+ }
} catch (Exception e) {
closeCurrentConnection(key, e);
}
@@ -388,7 +395,7 @@
"; # queued calls: " + callQueue.size());
}
- void doRead(SelectionKey key) {
+ void doRead(SelectionKey key) throws InterruptedException {
int count = 0;
Connection c = (Connection)key.attachment();
if (c == null) {
@@ -398,6 +405,8 @@
try {
count = c.readAndProcess();
+ } catch (InterruptedException ieo) {
+ throw ieo;
} catch (Exception e) {
LOG.debug(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
count = -1; //so that the (count < 0) block is executed
@@ -829,15 +838,7 @@
param.readFields(dis);
Call call = new Call(id, param, this);
- synchronized (callQueue) {
- if (callQueue.size() >= maxQueueSize) {
- Call oldCall = callQueue.removeFirst();
- LOG.warn("Call queue overflow discarding oldest call " + oldCall);
- }
- callQueue.addLast(call); // queue the call
- callQueue.notify(); // wake up a waiting handler
- }
-
+ callQueue.put(call); // queue the call; maybe blocked here
}
private synchronized void close() throws IOException {
@@ -867,14 +868,7 @@
ByteArrayOutputStream buf = new ByteArrayOutputStream(10240);
while (running) {
try {
- Call call;
- synchronized (callQueue) {
- while (running && callQueue.size()==0) { // wait for a call
- callQueue.wait(timeout);
- }
- if (!running) break;
- call = callQueue.removeFirst(); // pop the queue
- }
+ Call call = callQueue.take(); // pop the queue; maybe blocked here
// throw the message away if it is too old
if (System.currentTimeMillis() - call.receivedTime >
@@ -959,6 +953,7 @@
this.socketSendBufferSize = 0;
maxCallStartAge = (long) (timeout * MAX_CALL_QUEUE_TIME);
maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
+ this.callQueue = new LinkedBlockingQueue<Call>(maxQueueSize);
this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000);
this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);