You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/10/06 23:12:09 UTC

svn commit: r453766 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/ipc/Server.java

Author: cutting
Date: Fri Oct  6 14:12:08 2006
New Revision: 453766

URL: http://svn.apache.org/viewvc?view=rev&rev=453766
Log:
HADOOP-255.  Discard stale queued IPC calls.  Contributed by Owen.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=453766&r1=453765&r2=453766
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Oct  6 14:12:08 2006
@@ -142,6 +142,14 @@
 34. HADOOP-506.  Ignore heartbeats from stale task trackers.
    (Sanjay Dahiya via cutting)
 
+35. HADOOP-255.  Discard stale, queued IPC calls.  Do not process
+    calls whose clients will likely time out before they receive a
+    response.  When the queue is full, new calls are now received and
+    queued, and the oldest calls are discarded, so that, when servers
+    get bogged down, they no longer develop a backlog on the socket.
+    This should improve some DFS namenode failure modes.
+    (omalley via cutting)
+
 
 Release 0.6.2 - 2006-09-18
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java?view=diff&rev=453766&r1=453765&r2=453766
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Fri Oct  6 14:12:08 2006
@@ -17,7 +17,6 @@
 package org.apache.hadoop.ipc;
 
 import java.io.IOException;
-import java.io.EOFException;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.BufferedOutputStream;
@@ -30,7 +29,6 @@
 import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
-import java.nio.BufferUnderflowException;
 
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -57,6 +55,18 @@
  * @see Client
  */
 public abstract class Server {
+  /**
+   * How much time should be allocated for actually running the handler?
+   * Calls that are older than ipc.timeout * MAX_CALL_QUEUE_TIME
+   * are ignored when the handler takes them off the queue.
+   */
+  private static final float MAX_CALL_QUEUE_TIME = 0.6f;
+  
+  /**
+   * How many calls/handler are allowed in the queue.
+   */
+  private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100;
+  
   public static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.ipc.Server");
 
@@ -72,7 +82,6 @@
   private String bindAddress; 
   private int port;                               // port we listen on
   private int handlerCount;                       // number of handler threads
-  private int maxQueuedCalls;                     // max number of queued calls
   private Class paramClass;                       // class of call parameters
   private int maxIdleTime;                        // the maximum idle time after 
                                                   // which a client may be disconnected
@@ -87,6 +96,8 @@
   private Configuration conf;
 
   private int timeout;
+  private long maxCallStartAge;
+  private int maxQueueSize;
 
   private boolean running = true;                 // true while server runs
   private LinkedList callQueue = new LinkedList(); // queued calls
@@ -103,11 +114,17 @@
     private int id;                               // the client's call id
     private Writable param;                       // the parameter passed
     private Connection connection;                // connection to client
+    private long receivedTime;                    // the time received
 
     public Call(int id, Writable param, Connection connection) {
       this.id = id;
       this.param = param;
       this.connection = connection;
+      this.receivedTime = System.currentTimeMillis();
+    }
+    
+    public String toString() {
+      return param.toString() + " from " + connection.toString();
     }
   }
 
@@ -348,6 +365,10 @@
          this.channelOut = new SocketChannelOutputStream(channel, 4096)));
     }   
 
+    public String toString() {
+      return getHostAddress() + ":" + socket.getPort(); 
+    }
+    
     public String getHostAddress() {
       return socket.getInetAddress().getHostAddress();
     }
@@ -409,15 +430,13 @@
         
       Call call = new Call(id, param, this);
       synchronized (callQueue) {
+        if (callQueue.size() >= maxQueueSize) {
+          callQueue.removeFirst();
+        }
         callQueue.addLast(call);              // queue the call
         callQueue.notify();                   // wake up a waiting handler
       }
         
-      while (running && callQueue.size() >= maxQueuedCalls) {
-        synchronized (callDequeued) {         // queue is full
-          callDequeued.wait(timeout);         // wait for a dequeue
-        }
-      }
     }
 
     private void close() throws IOException {
@@ -462,6 +481,15 @@
             callDequeued.notify();
           }
 
+          // throw the message away if it is too old
+          if (System.currentTimeMillis() - call.receivedTime > 
+              maxCallStartAge) {
+            LOG.info("Call " + call.toString() + 
+                     " discarded for being too old (" +
+                     (System.currentTimeMillis() - call.receivedTime) + ")");
+            continue;
+          }
+          
           if (LOG.isDebugEnabled())
             LOG.debug(getName() + ": has #" + call.id + " from " +
                      call.connection.socket.getInetAddress().getHostAddress());
@@ -526,8 +554,9 @@
     this.port = port;
     this.paramClass = paramClass;
     this.handlerCount = handlerCount;
-    this.maxQueuedCalls = handlerCount;
     this.timeout = conf.getInt("ipc.client.timeout",10000);
+    maxCallStartAge = (long) (timeout * MAX_CALL_QUEUE_TIME);
+    maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
     this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000);
     this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
     this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);