You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by pg...@apache.org on 2002/10/26 06:02:56 UTC

cvs commit: jakarta-james/src/java/org/apache/james/util/connection ServerConnection.java SimpleConnectionManager.java

pgoldstein    2002/10/25 21:02:56

  Modified:    src/java/org/apache/james/util/connection
                        ServerConnection.java SimpleConnectionManager.java
  Log:
  Changing the SimpleConnectionManager so that it pools ClientConnectionRunners.  Also ensure
  that the thread is in a non-interrupted state when it is returned to the pool.
  
  Revision  Changes    Path
  1.2       +145 -47   jakarta-james/src/java/org/apache/james/util/connection/ServerConnection.java
  
  Index: ServerConnection.java
  ===================================================================
  RCS file: /home/cvs/jakarta-james/src/java/org/apache/james/util/connection/ServerConnection.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- ServerConnection.java	7 Oct 2002 07:16:46 -0000	1.1
  +++ ServerConnection.java	26 Oct 2002 04:02:55 -0000	1.2
  @@ -18,9 +18,16 @@
   
   import org.apache.avalon.cornerstone.services.connection.ConnectionHandler;
   import org.apache.avalon.cornerstone.services.connection.ConnectionHandlerFactory;
  +import org.apache.avalon.excalibur.pool.HardResourceLimitingPool;
  +import org.apache.avalon.excalibur.pool.ObjectFactory;
  +import org.apache.avalon.excalibur.pool.Pool;
  +import org.apache.avalon.excalibur.pool.Poolable;
   import org.apache.avalon.excalibur.thread.ThreadPool;
  +import org.apache.avalon.framework.activity.Disposable;
  +import org.apache.avalon.framework.activity.Initializable;
   import org.apache.avalon.framework.component.Component;
   import org.apache.avalon.framework.logger.AbstractLogEnabled;
  +import org.apache.avalon.framework.logger.LogEnabled;
   
   
   /**
  @@ -32,7 +39,7 @@
    * @author Peter M. Goldstein <fa...@alum.mit.edu>
    */
   public class ServerConnection extends AbstractLogEnabled
  -    implements Component, Runnable {
  +    implements Component, Initializable, Runnable {
   
       /**
        * This is a hack to deal with the fact that there appears to be
  @@ -59,6 +66,16 @@
       private ConnectionHandlerFactory handlerFactory;
   
       /**
  +     * The pool that produces ClientConnectionRunners
  +     */
  +    private Pool runnerPool;
  +
  +    /**
  +     * The factory used to provide ClientConnectionRunner objects
  +     */
  +    private ObjectFactory theRunnerFactory = new ClientConnectionRunnerFactory();
  +
  +    /**
        * The thread pool used to spawn individual threads used to manage each
        * client connection.
        */
  @@ -88,7 +105,7 @@
       /**
        * The thread used to manage this server connection.
        */
  -    private Thread serverConnectionThread;    
  +    private Thread serverConnectionThread;
   
       /**
        * The sole constructor for a ServerConnection.
  @@ -114,6 +131,17 @@
       }
   
       /**
  +     * @see org.apache.avalon.framework.activity.Initializable#initialize()
  +     */
  +    public void initialize() throws Exception {
  +        runnerPool = new HardResourceLimitingPool(theRunnerFactory, 5, maxOpenConn);
  +        if (runnerPool instanceof LogEnabled) {
  +            ((LogEnabled)runnerPool).enableLogging(getLogger());
  +        }
  +        ((Initializable)runnerPool).initialize();
  +    }
  +
  +    /**
        * The dispose operation is called by the owning ConnectionManager 
        * at the end of its lifecycle.  Cleans up the server connection, forcing
        * everything to finish.
  @@ -151,14 +179,17 @@
                       // Expected - just complete dispose()
                   }
               }
  +            if (runnerPool instanceof Disposable) {
  +                ((Disposable)runnerPool).dispose();
  +            }
  +            runnerPool = null;
           }
   
           getLogger().debug("Closed server connection - cleaning up clients - " + this.toString());
   
           synchronized (clientConnectionRunners) {
               Iterator runnerIterator = clientConnectionRunners.iterator();
  -            while( runnerIterator.hasNext() )
  -            {
  +            while( runnerIterator.hasNext() ) {
                   ClientConnectionRunner runner = (ClientConnectionRunner)runnerIterator.next();
                   runner.dispose();
                   runner = null;
  @@ -172,14 +203,20 @@
       }
   
       /**
  -     * Adds a ClientConnectionRunner to the set managed by this ServerConnection object.
  +     * Returns a ClientConnectionRunner in the set managed by this ServerConnection object.
        *
        * @param clientConnectionRunner the ClientConnectionRunner to be added
        */
  -    private void addClientConnectionRunner(ClientConnectionRunner clientConnectionRunner) {
  +    private ClientConnectionRunner addClientConnectionRunner()
  +            throws Exception {
           synchronized (clientConnectionRunners) {
  +            ClientConnectionRunner clientConnectionRunner = (ClientConnectionRunner)runnerPool.get();
               clientConnectionRunners.add(clientConnectionRunner);
               openConnections++;
  +            if (getLogger().isDebugEnabled()) {
  +                getLogger().debug("Adding one connection for a total of " + openConnections);
  +            }
  +            return clientConnectionRunner;
           }
       }
   
  @@ -190,8 +227,13 @@
        */
       private void removeClientConnectionRunner(ClientConnectionRunner clientConnectionRunner) {
           synchronized (clientConnectionRunners) {
  -            clientConnectionRunners.remove(clientConnectionRunner);
  -            openConnections--;
  +            if (clientConnectionRunners.remove(clientConnectionRunner)) {
  +                openConnections--;
  +                if (getLogger().isDebugEnabled()) {
  +                    getLogger().debug("Releasing one connection, leaving a total of " + openConnections);
  +                }
  +                runnerPool.put(clientConnectionRunner);
  +            }
           }
       }
   
  @@ -209,7 +251,16 @@
           } catch (SocketException se) {
               // Ignored - for the moment
           }
  -        while( null != serverConnectionThread && !serverConnectionThread.isInterrupted() ) {
  +
  +        if ((getLogger().isDebugEnabled()) && (serverConnectionThread != null)) {
  +            StringBuffer debugBuffer =
  +                new StringBuffer(128)
  +                    .append(serverConnectionThread.getName())
  +                    .append(" is listening on ")
  +                    .append(serverSocket.toString());
  +            getLogger().debug(debugBuffer.toString());
  +        }
  +        while( !Thread.currentThread().interrupted() && null != serverConnectionThread ) {
               try {
                   Socket clientSocket = null;
                   try {
  @@ -233,7 +284,7 @@
                   synchronized (clientConnectionRunners) {
                       if ((maxOpenConn > 0) && (openConnections >= maxOpenConn)) {
                           if (getLogger().isWarnEnabled()) {
  -                            getLogger().warn("Maximum number of open connections exceeded - refusing connection.  Current number of connections is " + openConnections);
  +                           getLogger().warn("Maximum number of open connections exceeded - refusing connection.  Current number of connections is " + openConnections);
                           }
                           try {
                               clientSocket.close();
  @@ -243,20 +294,36 @@
                           continue;
                       } else {
                           clientSocket.setSoTimeout(socketTimeout);
  -                        runner =
  -                            new ClientConnectionRunner(clientSocket, handlerFactory);
  +                        runner = addClientConnectionRunner();
  +                        runner.setSocket(clientSocket);
                       }
                   }
                   setupLogger( runner );
  -                connThreadPool.execute( runner );
  +                try {
  +                    connThreadPool.execute( runner );
  +                } catch (Exception e) {
  +                    // This error indicates that the underlying thread pool
  +                    // is out of threads.  For robustness, we catch this and
  +                    // cleanup
  +                    getLogger().error("Internal error - insufficient threads available to service request.", e);
  +                    try {
  +                        clientSocket.close();
  +                    } catch (IOException ignored) {
  +                        // We ignore this exception, as we already have an error condition.
  +                    }
  +                    // In this case, the thread will not remove the client connection runner,
  +                    // so we must.
  +                    removeClientConnectionRunner(runner);
  +                }
               } catch( IOException ioe ) {
                   getLogger().error( "Exception accepting connection", ioe );
               } catch( Exception e ) {
  -                getLogger().error( "Exception executing client connection runner: " + e.getMessage() );
  +                getLogger().error( "Exception executing client connection runner: " + e.getMessage(), e );
               }
           }
           synchronized( this ) {
               serverConnectionThread = null;
  +            Thread.currentThread().interrupted();
               notifyAll();
           }
       }
  @@ -269,7 +336,7 @@
        * @author Peter M. Goldstein <fa...@alum.mit.edu>
        */
       class ClientConnectionRunner extends AbstractLogEnabled
  -        implements Runnable, Component  {
  +        implements Component, Poolable, Runnable  {
   
           /**
            * The Socket that this client connection is using for transport.
  @@ -281,25 +348,10 @@
            */
           private Thread clientSocketThread;
   
  -        /**
  -         * The ConnectionHandlerFactory that generates a ConnectionHandler for
  -         * this client connection.
  -         */
  -        private ConnectionHandlerFactory clientConnectionHandlerFactory;
  -      
  -        /**
  -         * The constructor for a ClientConnectionRunner.
  -         *
  -         * @param socket the client socket associated with this ClientConnectionRunner
  -         * @param handlerFactory the factory that generates ConnectionHandlers for this
  -         *                       connection
  -         */
  -        public ClientConnectionRunner(Socket socket,
  -                                      ConnectionHandlerFactory handlerFactory) {
  -            clientSocket = socket;
  -            clientConnectionHandlerFactory = handlerFactory;
  +        public ClientConnectionRunner() {
  +            System.out.println("Creating ClientConnectionRunner");
           }
  -      
  +
           /**
            * The dispose operation that terminates the runner.  Should only be
            * called by the ServerConnection that owns the ClientConnectionRunner
  @@ -324,6 +376,15 @@
           }
   
           /**
  +         * Sets the socket for a ClientConnectionRunner.
  +         *
  +         * @param socket the client socket associated with this ClientConnectionRunner
  +         */
  +        public void setSocket(Socket socket) {
  +            clientSocket = socket;
  +        }
  +
  +        /**
            * Provides the body for the thread of execution dealing with a particular client
            * connection.  An appropriate ConnectionHandler is created, applied, executed, 
            * and released. 
  @@ -332,9 +393,8 @@
               ConnectionHandler handler = null;
               try {
                   clientSocketThread = Thread.currentThread();
  -                ServerConnection.this.addClientConnectionRunner(this);
   
  -                handler = clientConnectionHandlerFactory.createConnectionHandler();
  +                handler = ServerConnection.this.handlerFactory.createConnectionHandler();
                   String connectionString = null;
                   if( getLogger().isDebugEnabled() ) {
                       connectionString = getConnectionString();
  @@ -348,31 +408,41 @@
                       String message = "Ending " + connectionString;
                       getLogger().debug( message );
                   }
  +
               } catch( Exception e ) {
  -                getLogger().warn( "Error handling connection", e );
  +                getLogger().error( "Error handling connection", e );
               } finally {
  -    
  +
                   // Close the underlying socket
                   try {
  -                    clientSocket.close();
  +                    if (clientSocket != null) {
  +                        clientSocket.close();
  +                    }
                   } catch( IOException ioe ) {
                       getLogger().warn( "Error shutting down connection", ioe );
                   }
  -    
  -                // Release the handler and kill the reference to the handler factory
  -                if (handler != null) {
  -                    clientConnectionHandlerFactory.releaseConnectionHandler( handler );
  -                    handler = null;
  -                }
  -                clientConnectionHandlerFactory = null;
   
  -                // Remove this runner from the list of active connections.
  -                ServerConnection.this.removeClientConnectionRunner(this);
  +                clientSocket = null;
   
                   // Null out the thread, notify other threads to encourage
                   // a context switch
                   synchronized( this ) {
                       clientSocketThread = null;
  +
  +                    Thread.currentThread().interrupted();
  +
  +                    // Release the handler and kill the reference to the handler factory
  +                    //
  +                    // This needs to be done after the clientSocketThread is nulled out,
  +                    // otherwise we could trash a reused ClientConnectionRunner
  +                    if (handler != null) {
  +                        ServerConnection.this.handlerFactory.releaseConnectionHandler( handler );
  +                        handler = null;
  +                    }
  +
  +                    // Remove this runner from the list of active connections.
  +                    ServerConnection.this.removeClientConnectionRunner(this);
  +
                       notifyAll();
                   }
               }
  @@ -398,6 +468,34 @@
                       .append(":")
                       .append(clientSocket.getPort());
               return connectionBuffer.toString();
  +        }
  +    }
  +
  +    /**
  +     * The factory for producing handlers.
  +     */
  +    private class ClientConnectionRunnerFactory
  +        implements ObjectFactory {
  +
  +        /**
  +         * @see org.apache.avalon.excalibur.pool.ObjectFactory#newInstance()
  +         */
  +        public Object newInstance() throws Exception {
  +            return new ClientConnectionRunner();
  +        }
  +
  +        /**
  +         * @see org.apache.avalon.excalibur.pool.ObjectFactory#getCreatedClass()
  +         */
  +        public Class getCreatedClass() {
  +            return ClientConnectionRunner.class;
  +        }
  +
  +        /**
  +         * @see org.apache.avalon.excalibur.pool.ObjectFactory#decommision(Object)
  +         */
  +        public void decommission( Object object ) throws Exception {
  +            return;
           }
       }
   }
  
  
  
  1.2       +61 -9     jakarta-james/src/java/org/apache/james/util/connection/SimpleConnectionManager.java
  
  Index: SimpleConnectionManager.java
  ===================================================================
  RCS file: /home/cvs/jakarta-james/src/java/org/apache/james/util/connection/SimpleConnectionManager.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- SimpleConnectionManager.java	7 Oct 2002 07:16:46 -0000	1.1
  +++ SimpleConnectionManager.java	26 Oct 2002 04:02:55 -0000	1.2
  @@ -15,6 +15,7 @@
   import org.apache.avalon.cornerstone.services.connection.ConnectionHandlerFactory;
   import org.apache.avalon.cornerstone.services.connection.ConnectionManager;
   import org.apache.avalon.cornerstone.services.threads.ThreadManager;
  +import org.apache.avalon.framework.activity.Initializable;
   import org.apache.avalon.framework.component.ComponentException;
   import org.apache.avalon.framework.component.ComponentManager;
   import org.apache.avalon.framework.component.Composable;
  @@ -154,12 +155,14 @@
        * @param socket the ServerSocket from which to
        * @param handlerFactory the factory from which to acquire handlers
        * @param threadPool the thread pool to use
  +     * @param maxOpenConnections the maximum number of open connections allowed for this server socket.
        * @exception Exception if an error occurs
        */
       public void connect( String name,
                            ServerSocket socket,
                            ConnectionHandlerFactory handlerFactory,
  -                         ThreadPool threadPool )
  +                         ThreadPool threadPool,
  +                         int maxOpenConnections )
           throws Exception {
   
           if (disposed) {
  @@ -169,15 +172,40 @@
               throw new IllegalArgumentException( "Connection already exists with name " +
                                                   name );
           }
  -    
  -        ServerConnection runner = new ServerConnection(socket, handlerFactory, threadPool, timeout, maxOpenConn);
  +        if (maxOpenConnections < 0) {
  +            throw new IllegalArgumentException( "The maximum number of client connections per server socket cannot be less that zero.");
  +        }
  +        ServerConnection runner = new ServerConnection(socket, handlerFactory, threadPool, timeout, maxOpenConnections);
           setupLogger( runner );
  +        if (runner instanceof Initializable) {
  +            ((Initializable)runner).initialize();
  +        }
           connectionMap.put( name, runner );
           threadPool.execute(runner);
       }
   
       /**
        * Start managing a connection.
  +     * Management involves accepting connections and farming them out to threads
  +     * from pool to be handled.
  +     *
  +     * @param name the name of connection
  +     * @param socket the ServerSocket from which to
  +     * @param handlerFactory the factory from which to acquire handlers
  +     * @param threadPool the thread pool to use
  +     * @exception Exception if an error occurs
  +     */
  +    public void connect( String name,
  +                         ServerSocket socket,
  +                         ConnectionHandlerFactory handlerFactory,
  +                         ThreadPool threadPool )
  +        throws Exception {
  +
  +        connect(name, socket, handlerFactory, threadPool, maxOpenConn);
  +    }
  +
  +    /**
  +     * Start managing a connection.
        * This is similar to other connect method except that it uses default thread pool.
        *
        * @param name the name of connection
  @@ -188,12 +216,28 @@
       public void connect( String name,
                            ServerSocket socket,
                            ConnectionHandlerFactory handlerFactory )
  -        throws Exception
  -    {
  +        throws Exception {
           connect( name, socket, handlerFactory, threadManager.getDefaultThreadPool() );
       }
   
  -  
  +    /**
  +     * Start managing a connection.
  +     * This is similar to other connect method except that it uses default thread pool.
  +     *
  +     * @param name the name of connection
  +     * @param socket the ServerSocket from which to
  +     * @param handlerFactory the factory from which to acquire handlers
  +     * @param maxOpenConnections the maximum number of open connections allowed for this server socket.
  +     * @exception Exception if an error occurs
  +     */
  +    public void connect( String name,
  +                         ServerSocket socket,
  +                         ConnectionHandlerFactory handlerFactory,
  +                         int maxOpenConnections )
  +        throws Exception {
  +        connect( name, socket, handlerFactory, threadManager.getDefaultThreadPool(), maxOpenConnections );
  +    }
  +
       /**
        * This shuts down all handlers and socket, waiting for each to gracefully shutdown.
        *
  @@ -202,7 +246,6 @@
        */
       public void disconnect( final String name )
           throws Exception {
  -
           disconnect( name, false );
       }
     
  @@ -225,8 +268,17 @@
                                                   name );
           }
   
  -        // TODO: deal with tear down
  +        // TODO: deal with tear down parameter
           connection.dispose();
       }
  -  
  +
  +    /**
  +     * Returns the default maximum number of open connections supported by this
  +     * SimpleConnectionManager
  +     *
  +     * @return the maximum number of connections
  +     */
  +    public int getMaximumNumberOfOpenConnections() {
  +        return maxOpenConn;
  +    }
   }
  
  
  

--
To unsubscribe, e-mail:   <ma...@jakarta.apache.org>
For additional commands, e-mail: <ma...@jakarta.apache.org>