You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/06/15 00:41:25 UTC

svn commit: r414404 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/ipc/Client.java src/java/org/apache/hadoop/ipc/Server.java

Author: cutting
Date: Wed Jun 14 15:41:25 2006
New Revision: 414404

URL: http://svn.apache.org/viewvc?rev=414404&view=rev
Log:
Reverting patch for HADOOP-210, which was causing problems.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=414404&r1=414403&r2=414404&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Jun 14 15:41:25 2006
@@ -3,11 +3,7 @@
 
 Trunk (unreleased changes)
 
- 1. HADOOP-210.  Change RPC client and server so that server uses a
-    selector, and a thread per connection is no longer required.  This
-    should permit larger clusters.  (Devaraj Das via cutting)
-
- 2. HADOOP-298.  Improved progress reports for CopyFiles utility, the
+ 1. HADOOP-298.  Improved progress reports for CopyFiles utility, the
     distributed file copier.  (omalley via cutting)
 
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=414404&r1=414403&r2=414404&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java Wed Jun 14 15:41:25 2006
@@ -38,7 +38,6 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.UTF8;
-import org.apache.hadoop.io.DataOutputBuffer;
 
 /** A client for an IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
@@ -197,15 +196,8 @@
             LOG.debug(getName() + " sending #" + call.id);
           try {
             writingCall = call;
-            DataOutputBuffer d = new DataOutputBuffer(); //for serializing the
-                                                         //data to be written
-            d.writeInt(call.id);
-            call.param.write(d);
-            byte[] data = d.getData();
-            int dataLength = d.getLength();
-
-            out.writeInt(dataLength);      //first put the data length
-            out.write(data, 0, dataLength);//write the data
+            out.writeInt(call.id);
+            call.param.write(out);
             out.flush();
           } finally {
             writingCall = null;
@@ -216,7 +208,7 @@
         if (error)
           close();                                // close on error
       }
-    }  
+    }
 
     /** Close the connection and remove it from the pool. */
     public void close() {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=414404&r1=414403&r2=414404&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Wed Jun 14 15:41:25 2006
@@ -20,23 +20,17 @@
 import java.io.EOFException;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.StringWriter;
 import java.io.PrintWriter;
-import java.io.ByteArrayInputStream;
 
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.nio.BufferUnderflowException;
-
-import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.net.ServerSocket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
 
 import java.util.LinkedList;
-import java.util.Iterator;
-import java.util.Random;
 
 import org.apache.commons.logging.*;
 
@@ -44,8 +38,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
-
-import org.mortbay.http.nio.SocketChannelOutputStream;
+import org.apache.hadoop.io.UTF8;
 
 /** An abstract IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
@@ -72,16 +65,6 @@
   private int handlerCount;                       // number of handler threads
   private int maxQueuedCalls;                     // max number of queued calls
   private Class paramClass;                       // class of call parameters
-  private int maxIdleTime;                        // the maximum idle time after 
-                                                  // which a client may be disconnected
-  private int thresholdIdleConnections;           // the number of idle connections
-                                                  // after which we will start
-                                                  // cleaning up idle 
-                                                  // connections
-  int maxConnectionsToNuke;                       // the max number of 
-                                                  // connections to nuke
-                                                  //during a cleanup
-  
   private Configuration conf;
 
   private int timeout;
@@ -90,12 +73,6 @@
   private LinkedList callQueue = new LinkedList(); // queued calls
   private Object callDequeued = new Object();     // used by wait/notify
 
-  private InetSocketAddress address; //the address we bind at
-  private ServerSocketChannel acceptChannel = null; //the (main) accept channel
-  private Selector selector = null; //the selector that we use for the server
-  private Listener listener;
-  private int numConnections = 0;
-  
   /** A call queued for handling. */
   private static class Call {
     private int id;                               // the client's call id
@@ -109,300 +86,113 @@
     }
   }
 
-  /** Listens on the socket. Creates jobs for the handler threads*/
+  /** Listens on the socket, starting new connection threads. */
   private class Listener extends Thread {
-    
-    private LinkedList connectionList = new LinkedList(); //maintain a list
-                                                       //of client connectionss
-    private Random rand = new Random();
-    private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
-                                         //-tion (for idle connections) ran
-    private int cleanupInterval = 10000; //the minimum interval between 
-                                         //two cleanup runs
-    
-    public Listener() {
-      address = new InetSocketAddress(port);
+    private ServerSocket socket;
+
+    public Listener() throws IOException {
+      this.socket = new ServerSocket(port);
+      socket.setSoTimeout(timeout);
       this.setDaemon(true);
-    }
-    /** cleanup connections from connectionList. Choose a random range
-     * to scan and also have a limit on the number of the connections
-     * that will be cleanedup per run. The criteria for cleanup is the time
-     * for which the connection was idle. If 'force' is true then all 
-     * connections will be looked at for the cleanup.
-     */
-    private void cleanupConnections(boolean force) {
-      if (force || numConnections > thresholdIdleConnections) {
-        long currentTime = System.currentTimeMillis();
-        if (!force && (int)(currentTime - lastCleanupRunTime) < cleanupInterval) {
-          return;
-        }
-        int start = 0;
-        int end = numConnections - 1;
-        if (!force) {
-          start = rand.nextInt() % numConnections;
-          end = rand.nextInt() % numConnections;
-          int temp;
-          if (end < start) {
-            temp = start;
-            start = end;
-            end = temp;
-          }
-        }
-        int i = start;
-        int numNuked = 0;
-        while (i <= end) {
-          Connection c = (Connection)connectionList.get(i);
-          if (c.timedOut(currentTime)) {
-            connectionList.remove(i);
-            try {
-              LOG.info(getName() + ": disconnecting client " + c.getHostAddress());
-              c.close();
-            } catch (Exception e) {}
-            numNuked++;
-            end--;
-            if (!force && numNuked == maxConnectionsToNuke) break;
-          }
-          else i++;
-        }
-        lastCleanupRunTime = System.currentTimeMillis();
-      }
+      this.setName("Server listener on port " + port);
     }
 
     public void run() {
-      SERVER.set(Server.this);
-      
-      try {
-        // Create a new server socket and set to non blocking mode
-        acceptChannel = ServerSocketChannel.open();
-        acceptChannel.configureBlocking(false);
-
-        // Bind the server socket to the local host and port
-        acceptChannel.socket().bind(address);
-
-        // create a selector;
-        selector= Selector.open();
-
-        // Register accepts on the server socket with the selector.
-        acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
-        this.setName("Server listener on port " + port);
-          
-        LOG.info(getName() + ": starting");
-        
-        while (running) {
-          SelectionKey key = null;
+      LOG.info(getName() + ": starting");
+      while (running) {
+        Socket acceptedSock = null;
+        try {
+          acceptedSock = socket.accept();
+          new Connection(acceptedSock).start(); // start a new connection
+        } catch (SocketTimeoutException e) {      // ignore timeouts
+        } catch (OutOfMemoryError e) {
+          // we can run out of memory if we have too many threads
+          // log the event and sleep for a minute and give 
+          // some thread(s) a chance to finish
+          LOG.warn(getName() + " out of memory, sleeping...", e);          
           try {
-            selector.select(timeout);
-            Iterator iter = selector.selectedKeys().iterator();
-            
-            while (iter.hasNext()) {
-              key = (SelectionKey)iter.next();
-              if (key.isAcceptable())
-                doAccept(key);
-              else if (key.isReadable())
-                doRead(key);
-              iter.remove();
-              key = null;
-            }
-          } catch (OutOfMemoryError e) {
-            closeCurrentConnection(key, e);
-            cleanupConnections(true);
+            acceptedSock.close();
             Thread.sleep(60000);
-          } catch (Exception e) {
-            closeCurrentConnection(key, e);
-          }
-          cleanupConnections(false);
-        }
-      } catch (Exception e) {
-        LOG.fatal("selector",e);  
-      }
-      LOG.info("Stopping " + this.getName());
-
-      try {
-        if (acceptChannel != null)
-          acceptChannel.close();
-        if (selector != null)
-          selector.close();
-      } catch (IOException e) { }
-
-      selector= null;
-      acceptChannel= null;
-      connectionList = null;
-    }
-
-    private void closeCurrentConnection(SelectionKey key, Throwable e) {
-      if (running) {
-        LOG.warn("selector: " + e);
-        e.printStackTrace();
-      }
-      if (key != null) {
-        Connection c = (Connection)key.attachment();
-        if (c != null) {
-          connectionList.remove(c);
-          try {
-            LOG.info(getName() + ": disconnecting client " + c.getHostAddress());
-            c.close();
-          } catch (Exception ex) {}
+          } catch (InterruptedException ie) { // ignore interrupts
+          } catch (IOException ioe) { // ignore IOexceptions
+          }          
         }
+        catch (Exception e) {           // log all other exceptions
+          LOG.info(getName() + " caught: " + e, e);
+        }        
       }
-    }
-
-    void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
-      Connection c = null;
-      ServerSocketChannel server = (ServerSocketChannel) key.channel();
-      SocketChannel channel = server.accept();
-      channel.configureBlocking(false);
-      SelectionKey readKey = channel.register(selector, SelectionKey.OP_READ);
-      c = new Connection(readKey, channel, System.currentTimeMillis());
-      readKey.attach(c);
-      connectionList.addLast(c);
-      numConnections++;
-      LOG.info("Server connection on port " + port + " from " + 
-                c.getHostAddress() + ": starting");
-    }
-
-    void doRead(SelectionKey key) {
-      int count = 0;
-      if (!key.isValid() || !key.isReadable())
-        return;
-      Connection c = (Connection)key.attachment();
-      if (c == null) {
-        return;  
-      }
-      c.setLastContact(System.currentTimeMillis());
-      
       try {
-        count = c.readAndProcess();
-      } catch (Exception e) {
-        LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count);
-        count = -1; //so that the (count < 0) block is executed
+        socket.close();
+      } catch (IOException e) {
+        LOG.info(getName() + ": e=" + e);
       }
-      if (count < 0) {
-        connectionList.remove(c);
-        try {
-          LOG.info(getName() + ": disconnecting client " + c.getHostAddress());
-          c.close();
-        } catch (Exception e) {}
-      }
-      else {
-        c.setLastContact(System.currentTimeMillis());
-      }
-    }   
-
-    void doStop()
-    {
-        selector.wakeup();
-        Thread.yield();
+      LOG.info(getName() + ": exiting");
     }
   }
 
   /** Reads calls from a connection and queues them for handling. */
-  private class Connection {
-    private SocketChannel channel;
-    private SelectionKey key;
-    private ByteBuffer data;
-    private ByteBuffer dataLengthBuffer;
-    private DataOutputStream out;
-    private long lastContact;
-    private int dataLength;
+  private class Connection extends Thread {
     private Socket socket;
+    private DataInputStream in;
+    private DataOutputStream out;
 
-    public Connection(SelectionKey key, SocketChannel channel, 
-    long lastContact) {
-      this.key = key;
-      this.channel = channel;
-      this.lastContact = lastContact;
-      this.data = null;
-      this.dataLengthBuffer = null;
-      this.socket = channel.socket();
+    public Connection(Socket socket) throws IOException {
+      this.socket = socket;
+      socket.setSoTimeout(timeout);
+      this.in = new DataInputStream
+        (new BufferedInputStream(socket.getInputStream()));
       this.out = new DataOutputStream
-        (new SocketChannelOutputStream(channel, 4096));
-    }   
-
-    public String getHostAddress() {
-      return socket.getInetAddress().getHostAddress();
-    }
-
-    public void setLastContact(long lastContact) {
-      this.lastContact = lastContact;
-    }
-
-    public long getLastContact() {
-      return lastContact;
-    }
-
-    private boolean timedOut() {
-      if(System.currentTimeMillis() -  lastContact > maxIdleTime)
-        return true;
-      return false;
-    }
-
-    private boolean timedOut(long currentTime) {
-        if(currentTime -  lastContact > timeout)
-          return true;
-        return false;
-    }
-
-    public int readAndProcess() throws IOException, InterruptedException {
-      int count = -1;
-      if (dataLengthBuffer == null)
-        dataLengthBuffer = ByteBuffer.allocateDirect(4);
-      if (dataLengthBuffer.remaining() > 0) {
-        count = channel.read(dataLengthBuffer);
-        if (count < 0) return count;
-        if (dataLengthBuffer.remaining() == 0) {
-          dataLengthBuffer.flip(); 
-          dataLength = dataLengthBuffer.getInt();
-          data = ByteBuffer.allocateDirect(dataLength);
-        }
-        return count;
-      }
-      count = channel.read(data);
-      if (data.remaining() == 0) {
-        data.flip();
-        processData();
-        data = dataLengthBuffer = null; 
-      }
-      return count;
+        (new BufferedOutputStream(socket.getOutputStream()));
+      this.setDaemon(true);
+      this.setName("Server connection on port " + port + " from "
+                   + socket.getInetAddress().getHostAddress());
     }
 
-    private void processData() throws  IOException, InterruptedException {
-      byte[] bytes = new byte[dataLength];
-      data.get(bytes);
-      DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
-      int id = dis.readInt();                    // try to read an id
+    public void run() {
+      LOG.info(getName() + ": starting");
+      SERVER.set(Server.this);
+      try {
+        while (running) {
+          int id;
+          try {
+            id = in.readInt();                    // try to read an id
+          } catch (SocketTimeoutException e) {
+            continue;
+          }
         
-      if (LOG.isDebugEnabled())
-        LOG.debug(" got #" + id);
-            
-      Writable param = makeParam();           // read param
-      param.readFields(dis);        
+          if (LOG.isDebugEnabled())
+            LOG.debug(getName() + " got #" + id);
         
-      Call call = new Call(id, param, this);
-      synchronized (callQueue) {
-        callQueue.addLast(call);              // queue the call
-        callQueue.notify();                   // wake up a waiting handler
-      }
+          Writable param = makeParam();           // read param
+          param.readFields(in);        
+        
+          Call call = new Call(id, param, this);
+        
+          synchronized (callQueue) {
+            callQueue.addLast(call);              // queue the call
+            callQueue.notify();                   // wake up a waiting handler
+          }
         
-      while (running && callQueue.size() >= maxQueuedCalls) {
-        synchronized (callDequeued) {         // queue is full
-          callDequeued.wait(timeout);         // wait for a dequeue
+          while (running && callQueue.size() >= maxQueuedCalls) {
+            synchronized (callDequeued) {         // queue is full
+              callDequeued.wait(timeout);         // wait for a dequeue
+            }
+          }
         }
+      } catch (EOFException eof) {
+          // This is what happens on linux when the other side shuts down
+      } catch (SocketException eof) {
+          // This is what happens on Win32 when the other side shuts down
+      } catch (Exception e) {
+        LOG.info(getName() + " caught: " + e, e);
+      } finally {
+        try {
+          socket.close();
+        } catch (IOException e) {}
+        LOG.info(getName() + ": exiting");
       }
     }
 
-    private void close() throws IOException {
-      data = null;
-      dataLengthBuffer = null;
-      if (!channel.isOpen())
-        return;
-      socket.shutdownOutput();
-      channel.close();
-      socket.close();
-      channel.close();
-      out.close();
-      key.cancel();
-      numConnections--;
-    }
   }
 
   /** Handles queued calls . */
@@ -455,6 +245,7 @@
               WritableUtils.writeString(out, errorClass);
               WritableUtils.writeString(out, error);
             }
+            out.flush();
           }
 
         } catch (Exception e) {
@@ -484,10 +275,7 @@
     this.paramClass = paramClass;
     this.handlerCount = handlerCount;
     this.maxQueuedCalls = handlerCount;
-    this.timeout = conf.getInt("ipc.client.timeout",10000);
-    this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000);
-    this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
-    this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 120000);
+    this.timeout = conf.getInt("ipc.client.timeout",10000); 
   }
 
   /** Sets the timeout used for network i/o. */
@@ -495,7 +283,7 @@
 
   /** Starts the service.  Must be called before any calls will be handled. */
   public synchronized void start() throws IOException {
-    listener = new Listener();
+    Listener listener = new Listener();
     listener.start();
     
     for (int i = 0; i < handlerCount; i++) {
@@ -510,7 +298,6 @@
   public synchronized void stop() {
     LOG.info("Stopping server on " + port);
     running = false;
-    listener.doStop();
     try {
       Thread.sleep(timeout);     //  inexactly wait for pending requests to finish
     } catch (InterruptedException e) {}