You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2008/03/20 00:05:20 UTC

svn commit: r639057 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/ipc/Client.java src/java/org/apache/hadoop/ipc/Server.java

Author: dhruba
Date: Wed Mar 19 16:05:09 2008
New Revision: 639057

URL: http://svn.apache.org/viewvc?rev=639057&view=rev
Log:
HADOOP-2910. Throttle IPC Client/Server during bursts of
requests or server slowdown. (Hairong Kuang via dhruba)


Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java
    hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=639057&r1=639056&r2=639057&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Mar 19 16:05:09 2008
@@ -111,6 +111,9 @@
     HADOOP-2239. Add HsftpFileSystem to permit transferring files over ssl.
     (cdouglas)
 
+    HADOOP-2910. Throttle IPC Client/Server during bursts of 
+    requests or server slowdown. (Hairong Kuang via dhruba)
+
   OPTIMIZATIONS
 
     HADOOP-2790.  Fixed inefficient method hasSpeculativeTask by removing

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=639057&r1=639056&r2=639057&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java Wed Mar 19 16:05:09 2008
@@ -171,7 +171,7 @@
         try {
           this.socket = socketFactory.createSocket();
           this.socket.setTcpNoDelay(tcpNoDelay);
-          this.socket.connect(remoteId.getAddress(), FSConstants.READ_TIMEOUT);
+          this.socket.connect(remoteId.getAddress());
           break;
         } catch (IOException ie) { //SocketTimeoutException is also caught 
           if (failures == maxRetries) {

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=639057&r1=639056&r2=639057&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java Wed Mar 19 16:05:09 2008
@@ -45,6 +45,8 @@
 import java.util.List;
 import java.util.Iterator;
 import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -155,7 +157,7 @@
   private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
 
   volatile private boolean running = true;         // true while server runs
-  private LinkedList<Call> callQueue = new LinkedList<Call>(); // queued calls
+  private BlockingQueue<Call> callQueue; // queued calls
 
   private List<Connection> connectionList = 
     Collections.synchronizedList(new LinkedList<Connection>());
@@ -330,6 +332,11 @@
           closeCurrentConnection(key, e);
           cleanupConnections(true);
           try { Thread.sleep(60000); } catch (Exception ie) {}
+        } catch (InterruptedException e) {
+          if (running) {                          // unexpected -- log it
+            LOG.info(getName() + " caught: " +
+                     StringUtils.stringifyException(e));
+          }
         } catch (Exception e) {
           closeCurrentConnection(key, e);
         }
@@ -388,7 +395,7 @@
                   "; # queued calls: " + callQueue.size());
     }
 
-    void doRead(SelectionKey key) {
+    void doRead(SelectionKey key) throws InterruptedException {
       int count = 0;
       Connection c = (Connection)key.attachment();
       if (c == null) {
@@ -398,6 +405,8 @@
       
       try {
         count = c.readAndProcess();
+      } catch (InterruptedException ieo) {
+        throw ieo;
       } catch (Exception e) {
         LOG.debug(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
         count = -1; //so that the (count < 0) block is executed
@@ -829,15 +838,7 @@
       param.readFields(dis);        
         
       Call call = new Call(id, param, this);
-      synchronized (callQueue) {
-        if (callQueue.size() >= maxQueueSize) {
-          Call oldCall = callQueue.removeFirst();
-          LOG.warn("Call queue overflow discarding oldest call " + oldCall);
-        }
-        callQueue.addLast(call);              // queue the call
-        callQueue.notify();                   // wake up a waiting handler
-      }
-        
+      callQueue.put(call);              // queue the call; maybe blocked here
     }
 
     private synchronized void close() throws IOException {
@@ -867,14 +868,7 @@
       ByteArrayOutputStream buf = new ByteArrayOutputStream(10240);
       while (running) {
         try {
-          Call call;
-          synchronized (callQueue) {
-            while (running && callQueue.size()==0) { // wait for a call
-              callQueue.wait(timeout);
-            }
-            if (!running) break;
-            call = callQueue.removeFirst(); // pop the queue
-          }
+          Call call = callQueue.take(); // pop the queue; maybe blocked here
 
           // throw the message away if it is too old
           if (System.currentTimeMillis() - call.receivedTime > 
@@ -959,6 +953,7 @@
     this.socketSendBufferSize = 0;
     maxCallStartAge = (long) (timeout * MAX_CALL_QUEUE_TIME);
     maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
+    this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueSize); 
     this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000);
     this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
     this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);