You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/10/06 23:12:09 UTC
svn commit: r453766 - in /lucene/hadoop/trunk: CHANGES.txt
src/java/org/apache/hadoop/ipc/Server.java
Author: cutting
Date: Fri Oct 6 14:12:08 2006
New Revision: 453766
URL: http://svn.apache.org/viewvc?view=rev&rev=453766
Log:
HADOOP-255. Discard stale queued IPC calls. Contributed by Owen.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=453766&r1=453765&r2=453766
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Oct 6 14:12:08 2006
@@ -142,6 +142,14 @@
34. HADOOP-506. Ignore heartbeats from stale task trackers.
(Sanjay Dahiya via cutting)
+35. HADOOP-255. Discard stale, queued IPC calls. Do not process
+ calls whose clients will likely time out before they receive a
+ response. When the queue is full, new calls are now received and
+ queued, and the oldest calls are discarded, so that, when servers
+ get bogged down, they no longer develop a backlog on the socket.
+ This should improve some DFS namenode failure modes.
+ (omalley via cutting)
+
Release 0.6.2 - 2006-09-18
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java?view=diff&rev=453766&r1=453765&r2=453766
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Fri Oct 6 14:12:08 2006
@@ -17,7 +17,6 @@
package org.apache.hadoop.ipc;
import java.io.IOException;
-import java.io.EOFException;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.BufferedOutputStream;
@@ -30,7 +29,6 @@
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
-import java.nio.BufferUnderflowException;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -57,6 +55,18 @@
* @see Client
*/
public abstract class Server {
+ /**
+ * How much time should be allocated for actually running the handler?
+ * Calls that are older than ipc.timeout * MAX_CALL_QUEUE_TIME
+ * are ignored when the handler takes them off the queue.
+ */
+ private static final float MAX_CALL_QUEUE_TIME = 0.6f;
+
+ /**
+ * How many calls/handler are allowed in the queue.
+ */
+ private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100;
+
public static final Log LOG =
LogFactory.getLog("org.apache.hadoop.ipc.Server");
@@ -72,7 +82,6 @@
private String bindAddress;
private int port; // port we listen on
private int handlerCount; // number of handler threads
- private int maxQueuedCalls; // max number of queued calls
private Class paramClass; // class of call parameters
private int maxIdleTime; // the maximum idle time after
// which a client may be disconnected
@@ -87,6 +96,8 @@
private Configuration conf;
private int timeout;
+ private long maxCallStartAge;
+ private int maxQueueSize;
private boolean running = true; // true while server runs
private LinkedList callQueue = new LinkedList(); // queued calls
@@ -103,11 +114,17 @@
private int id; // the client's call id
private Writable param; // the parameter passed
private Connection connection; // connection to client
+ private long receivedTime; // the time received
public Call(int id, Writable param, Connection connection) {
this.id = id;
this.param = param;
this.connection = connection;
+ this.receivedTime = System.currentTimeMillis();
+ }
+
+ public String toString() {
+ return param.toString() + " from " + connection.toString();
}
}
@@ -348,6 +365,10 @@
this.channelOut = new SocketChannelOutputStream(channel, 4096)));
}
+ public String toString() {
+ return getHostAddress() + ":" + socket.getPort();
+ }
+
public String getHostAddress() {
return socket.getInetAddress().getHostAddress();
}
@@ -409,15 +430,13 @@
Call call = new Call(id, param, this);
synchronized (callQueue) {
+ if (callQueue.size() >= maxQueueSize) {
+ callQueue.removeFirst();
+ }
callQueue.addLast(call); // queue the call
callQueue.notify(); // wake up a waiting handler
}
- while (running && callQueue.size() >= maxQueuedCalls) {
- synchronized (callDequeued) { // queue is full
- callDequeued.wait(timeout); // wait for a dequeue
- }
- }
}
private void close() throws IOException {
@@ -462,6 +481,15 @@
callDequeued.notify();
}
+ // throw the message away if it is too old
+ if (System.currentTimeMillis() - call.receivedTime >
+ maxCallStartAge) {
+ LOG.info("Call " + call.toString() +
+ " discarded for being too old (" +
+ (System.currentTimeMillis() - call.receivedTime) + ")");
+ continue;
+ }
+
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": has #" + call.id + " from " +
call.connection.socket.getInetAddress().getHostAddress());
@@ -526,8 +554,9 @@
this.port = port;
this.paramClass = paramClass;
this.handlerCount = handlerCount;
- this.maxQueuedCalls = handlerCount;
this.timeout = conf.getInt("ipc.client.timeout",10000);
+ maxCallStartAge = (long) (timeout * MAX_CALL_QUEUE_TIME);
+ maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000);
this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);