You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by pg...@apache.org on 2002/10/26 06:02:56 UTC
cvs commit: jakarta-james/src/java/org/apache/james/util/connection ServerConnection.java SimpleConnectionManager.java
pgoldstein 2002/10/25 21:02:56
Modified: src/java/org/apache/james/util/connection
ServerConnection.java SimpleConnectionManager.java
Log:
Changing the SimpleConnectionManager so that it pools ClientConnectionRunners. Also ensure
that the thread is in a non-interrupted state when it is returned to the pool.
Revision Changes Path
1.2 +145 -47 jakarta-james/src/java/org/apache/james/util/connection/ServerConnection.java
Index: ServerConnection.java
===================================================================
RCS file: /home/cvs/jakarta-james/src/java/org/apache/james/util/connection/ServerConnection.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- ServerConnection.java 7 Oct 2002 07:16:46 -0000 1.1
+++ ServerConnection.java 26 Oct 2002 04:02:55 -0000 1.2
@@ -18,9 +18,16 @@
import org.apache.avalon.cornerstone.services.connection.ConnectionHandler;
import org.apache.avalon.cornerstone.services.connection.ConnectionHandlerFactory;
+import org.apache.avalon.excalibur.pool.HardResourceLimitingPool;
+import org.apache.avalon.excalibur.pool.ObjectFactory;
+import org.apache.avalon.excalibur.pool.Pool;
+import org.apache.avalon.excalibur.pool.Poolable;
import org.apache.avalon.excalibur.thread.ThreadPool;
+import org.apache.avalon.framework.activity.Disposable;
+import org.apache.avalon.framework.activity.Initializable;
import org.apache.avalon.framework.component.Component;
import org.apache.avalon.framework.logger.AbstractLogEnabled;
+import org.apache.avalon.framework.logger.LogEnabled;
/**
@@ -32,7 +39,7 @@
* @author Peter M. Goldstein <fa...@alum.mit.edu>
*/
public class ServerConnection extends AbstractLogEnabled
- implements Component, Runnable {
+ implements Component, Initializable, Runnable {
/**
* This is a hack to deal with the fact that there appears to be
@@ -59,6 +66,16 @@
private ConnectionHandlerFactory handlerFactory;
/**
+ * The pool that produces ClientConnectionRunners
+ */
+ private Pool runnerPool;
+
+ /**
+ * The factory used to provide ClientConnectionRunner objects
+ */
+ private ObjectFactory theRunnerFactory = new ClientConnectionRunnerFactory();
+
+ /**
* The thread pool used to spawn individual threads used to manage each
* client connection.
*/
@@ -88,7 +105,7 @@
/**
* The thread used to manage this server connection.
*/
- private Thread serverConnectionThread;
+ private Thread serverConnectionThread;
/**
* The sole constructor for a ServerConnection.
@@ -114,6 +131,17 @@
}
/**
+ * @see org.apache.avalon.framework.activity.Initializable#initialize()
+ */
+ public void initialize() throws Exception {
+ runnerPool = new HardResourceLimitingPool(theRunnerFactory, 5, maxOpenConn);
+ if (runnerPool instanceof LogEnabled) {
+ ((LogEnabled)runnerPool).enableLogging(getLogger());
+ }
+ ((Initializable)runnerPool).initialize();
+ }
+
+ /**
* The dispose operation is called by the owning ConnectionManager
* at the end of its lifecycle. Cleans up the server connection, forcing
* everything to finish.
@@ -151,14 +179,17 @@
// Expected - just complete dispose()
}
}
+ if (runnerPool instanceof Disposable) {
+ ((Disposable)runnerPool).dispose();
+ }
+ runnerPool = null;
}
getLogger().debug("Closed server connection - cleaning up clients - " + this.toString());
synchronized (clientConnectionRunners) {
Iterator runnerIterator = clientConnectionRunners.iterator();
- while( runnerIterator.hasNext() )
- {
+ while( runnerIterator.hasNext() ) {
ClientConnectionRunner runner = (ClientConnectionRunner)runnerIterator.next();
runner.dispose();
runner = null;
@@ -172,14 +203,20 @@
}
/**
- * Adds a ClientConnectionRunner to the set managed by this ServerConnection object.
+ * Returns a ClientConnectionRunner in the set managed by this ServerConnection object.
*
* @param clientConnectionRunner the ClientConnectionRunner to be added
*/
- private void addClientConnectionRunner(ClientConnectionRunner clientConnectionRunner) {
+ private ClientConnectionRunner addClientConnectionRunner()
+ throws Exception {
synchronized (clientConnectionRunners) {
+ ClientConnectionRunner clientConnectionRunner = (ClientConnectionRunner)runnerPool.get();
clientConnectionRunners.add(clientConnectionRunner);
openConnections++;
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug("Adding one connection for a total of " + openConnections);
+ }
+ return clientConnectionRunner;
}
}
@@ -190,8 +227,13 @@
*/
private void removeClientConnectionRunner(ClientConnectionRunner clientConnectionRunner) {
synchronized (clientConnectionRunners) {
- clientConnectionRunners.remove(clientConnectionRunner);
- openConnections--;
+ if (clientConnectionRunners.remove(clientConnectionRunner)) {
+ openConnections--;
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug("Releasing one connection, leaving a total of " + openConnections);
+ }
+ runnerPool.put(clientConnectionRunner);
+ }
}
}
@@ -209,7 +251,16 @@
} catch (SocketException se) {
// Ignored - for the moment
}
- while( null != serverConnectionThread && !serverConnectionThread.isInterrupted() ) {
+
+ if ((getLogger().isDebugEnabled()) && (serverConnectionThread != null)) {
+ StringBuffer debugBuffer =
+ new StringBuffer(128)
+ .append(serverConnectionThread.getName())
+ .append(" is listening on ")
+ .append(serverSocket.toString());
+ getLogger().debug(debugBuffer.toString());
+ }
+ while( !Thread.currentThread().interrupted() && null != serverConnectionThread ) {
try {
Socket clientSocket = null;
try {
@@ -233,7 +284,7 @@
synchronized (clientConnectionRunners) {
if ((maxOpenConn > 0) && (openConnections >= maxOpenConn)) {
if (getLogger().isWarnEnabled()) {
- getLogger().warn("Maximum number of open connections exceeded - refusing connection. Current number of connections is " + openConnections);
+ getLogger().warn("Maximum number of open connections exceeded - refusing connection. Current number of connections is " + openConnections);
}
try {
clientSocket.close();
@@ -243,20 +294,36 @@
continue;
} else {
clientSocket.setSoTimeout(socketTimeout);
- runner =
- new ClientConnectionRunner(clientSocket, handlerFactory);
+ runner = addClientConnectionRunner();
+ runner.setSocket(clientSocket);
}
}
setupLogger( runner );
- connThreadPool.execute( runner );
+ try {
+ connThreadPool.execute( runner );
+ } catch (Exception e) {
+ // This error indicates that the underlying thread pool
+ // is out of threads. For robustness, we catch this and
+ // cleanup
+ getLogger().error("Internal error - insufficient threads available to service request.", e);
+ try {
+ clientSocket.close();
+ } catch (IOException ignored) {
+ // We ignore this exception, as we already have an error condition.
+ }
+ // In this case, the thread will not remove the client connection runner,
+ // so we must.
+ removeClientConnectionRunner(runner);
+ }
} catch( IOException ioe ) {
getLogger().error( "Exception accepting connection", ioe );
} catch( Exception e ) {
- getLogger().error( "Exception executing client connection runner: " + e.getMessage() );
+ getLogger().error( "Exception executing client connection runner: " + e.getMessage(), e );
}
}
synchronized( this ) {
serverConnectionThread = null;
+ Thread.currentThread().interrupted();
notifyAll();
}
}
@@ -269,7 +336,7 @@
* @author Peter M. Goldstein <fa...@alum.mit.edu>
*/
class ClientConnectionRunner extends AbstractLogEnabled
- implements Runnable, Component {
+ implements Component, Poolable, Runnable {
/**
* The Socket that this client connection is using for transport.
@@ -281,25 +348,10 @@
*/
private Thread clientSocketThread;
- /**
- * The ConnectionHandlerFactory that generates a ConnectionHandler for
- * this client connection.
- */
- private ConnectionHandlerFactory clientConnectionHandlerFactory;
-
- /**
- * The constructor for a ClientConnectionRunner.
- *
- * @param socket the client socket associated with this ClientConnectionRunner
- * @param handlerFactory the factory that generates ConnectionHandlers for this
- * connection
- */
- public ClientConnectionRunner(Socket socket,
- ConnectionHandlerFactory handlerFactory) {
- clientSocket = socket;
- clientConnectionHandlerFactory = handlerFactory;
+ public ClientConnectionRunner() {
+ System.out.println("Creating ClientConnectionRunner");
}
-
+
/**
* The dispose operation that terminates the runner. Should only be
* called by the ServerConnection that owns the ClientConnectionRunner
@@ -324,6 +376,15 @@
}
/**
+ * Sets the socket for a ClientConnectionRunner.
+ *
+ * @param socket the client socket associated with this ClientConnectionRunner
+ */
+ public void setSocket(Socket socket) {
+ clientSocket = socket;
+ }
+
+ /**
* Provides the body for the thread of execution dealing with a particular client
* connection. An appropriate ConnectionHandler is created, applied, executed,
* and released.
@@ -332,9 +393,8 @@
ConnectionHandler handler = null;
try {
clientSocketThread = Thread.currentThread();
- ServerConnection.this.addClientConnectionRunner(this);
- handler = clientConnectionHandlerFactory.createConnectionHandler();
+ handler = ServerConnection.this.handlerFactory.createConnectionHandler();
String connectionString = null;
if( getLogger().isDebugEnabled() ) {
connectionString = getConnectionString();
@@ -348,31 +408,41 @@
String message = "Ending " + connectionString;
getLogger().debug( message );
}
+
} catch( Exception e ) {
- getLogger().warn( "Error handling connection", e );
+ getLogger().error( "Error handling connection", e );
} finally {
-
+
// Close the underlying socket
try {
- clientSocket.close();
+ if (clientSocket != null) {
+ clientSocket.close();
+ }
} catch( IOException ioe ) {
getLogger().warn( "Error shutting down connection", ioe );
}
-
- // Release the handler and kill the reference to the handler factory
- if (handler != null) {
- clientConnectionHandlerFactory.releaseConnectionHandler( handler );
- handler = null;
- }
- clientConnectionHandlerFactory = null;
- // Remove this runner from the list of active connections.
- ServerConnection.this.removeClientConnectionRunner(this);
+ clientSocket = null;
// Null out the thread, notify other threads to encourage
// a context switch
synchronized( this ) {
clientSocketThread = null;
+
+ Thread.currentThread().interrupted();
+
+ // Release the handler and kill the reference to the handler factory
+ //
+ // This needs to be done after the clientSocketThread is nulled out,
+ // otherwise we could trash a reused ClientConnectionRunner
+ if (handler != null) {
+ ServerConnection.this.handlerFactory.releaseConnectionHandler( handler );
+ handler = null;
+ }
+
+ // Remove this runner from the list of active connections.
+ ServerConnection.this.removeClientConnectionRunner(this);
+
notifyAll();
}
}
@@ -398,6 +468,34 @@
.append(":")
.append(clientSocket.getPort());
return connectionBuffer.toString();
+ }
+ }
+
+ /**
+ * The factory for producing handlers.
+ */
+ private class ClientConnectionRunnerFactory
+ implements ObjectFactory {
+
+ /**
+ * @see org.apache.avalon.excalibur.pool.ObjectFactory#newInstance()
+ */
+ public Object newInstance() throws Exception {
+ return new ClientConnectionRunner();
+ }
+
+ /**
+ * @see org.apache.avalon.excalibur.pool.ObjectFactory#getCreatedClass()
+ */
+ public Class getCreatedClass() {
+ return ClientConnectionRunner.class;
+ }
+
+ /**
+ * @see org.apache.avalon.excalibur.pool.ObjectFactory#decommision(Object)
+ */
+ public void decommission( Object object ) throws Exception {
+ return;
}
}
}
1.2 +61 -9 jakarta-james/src/java/org/apache/james/util/connection/SimpleConnectionManager.java
Index: SimpleConnectionManager.java
===================================================================
RCS file: /home/cvs/jakarta-james/src/java/org/apache/james/util/connection/SimpleConnectionManager.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- SimpleConnectionManager.java 7 Oct 2002 07:16:46 -0000 1.1
+++ SimpleConnectionManager.java 26 Oct 2002 04:02:55 -0000 1.2
@@ -15,6 +15,7 @@
import org.apache.avalon.cornerstone.services.connection.ConnectionHandlerFactory;
import org.apache.avalon.cornerstone.services.connection.ConnectionManager;
import org.apache.avalon.cornerstone.services.threads.ThreadManager;
+import org.apache.avalon.framework.activity.Initializable;
import org.apache.avalon.framework.component.ComponentException;
import org.apache.avalon.framework.component.ComponentManager;
import org.apache.avalon.framework.component.Composable;
@@ -154,12 +155,14 @@
* @param socket the ServerSocket from which to
* @param handlerFactory the factory from which to acquire handlers
* @param threadPool the thread pool to use
+ * @param maxOpenConnections the maximum number of open connections allowed for this server socket.
* @exception Exception if an error occurs
*/
public void connect( String name,
ServerSocket socket,
ConnectionHandlerFactory handlerFactory,
- ThreadPool threadPool )
+ ThreadPool threadPool,
+ int maxOpenConnections )
throws Exception {
if (disposed) {
@@ -169,15 +172,40 @@
throw new IllegalArgumentException( "Connection already exists with name " +
name );
}
-
- ServerConnection runner = new ServerConnection(socket, handlerFactory, threadPool, timeout, maxOpenConn);
+ if (maxOpenConnections < 0) {
+ throw new IllegalArgumentException( "The maximum number of client connections per server socket cannot be less that zero.");
+ }
+ ServerConnection runner = new ServerConnection(socket, handlerFactory, threadPool, timeout, maxOpenConnections);
setupLogger( runner );
+ if (runner instanceof Initializable) {
+ ((Initializable)runner).initialize();
+ }
connectionMap.put( name, runner );
threadPool.execute(runner);
}
/**
* Start managing a connection.
+ * Management involves accepting connections and farming them out to threads
+ * from pool to be handled.
+ *
+ * @param name the name of connection
+ * @param socket the ServerSocket from which to
+ * @param handlerFactory the factory from which to acquire handlers
+ * @param threadPool the thread pool to use
+ * @exception Exception if an error occurs
+ */
+ public void connect( String name,
+ ServerSocket socket,
+ ConnectionHandlerFactory handlerFactory,
+ ThreadPool threadPool )
+ throws Exception {
+
+ connect(name, socket, handlerFactory, threadPool, maxOpenConn);
+ }
+
+ /**
+ * Start managing a connection.
* This is similar to other connect method except that it uses default thread pool.
*
* @param name the name of connection
@@ -188,12 +216,28 @@
public void connect( String name,
ServerSocket socket,
ConnectionHandlerFactory handlerFactory )
- throws Exception
- {
+ throws Exception {
connect( name, socket, handlerFactory, threadManager.getDefaultThreadPool() );
}
-
+ /**
+ * Start managing a connection.
+ * This is similar to other connect method except that it uses default thread pool.
+ *
+ * @param name the name of connection
+ * @param socket the ServerSocket from which to
+ * @param handlerFactory the factory from which to acquire handlers
+ * @param maxOpenConnections the maximum number of open connections allowed for this server socket.
+ * @exception Exception if an error occurs
+ */
+ public void connect( String name,
+ ServerSocket socket,
+ ConnectionHandlerFactory handlerFactory,
+ int maxOpenConnections )
+ throws Exception {
+ connect( name, socket, handlerFactory, threadManager.getDefaultThreadPool(), maxOpenConnections );
+ }
+
/**
* This shuts down all handlers and socket, waiting for each to gracefully shutdown.
*
@@ -202,7 +246,6 @@
*/
public void disconnect( final String name )
throws Exception {
-
disconnect( name, false );
}
@@ -225,8 +268,17 @@
name );
}
- // TODO: deal with tear down
+ // TODO: deal with tear down parameter
connection.dispose();
}
-
+
+ /**
+ * Returns the default maximum number of open connections supported by this
+ * SimpleConnectionManager
+ *
+ * @return the maximum number of connections
+ */
+ public int getMaximumNumberOfOpenConnections() {
+ return maxOpenConn;
+ }
}
--
To unsubscribe, e-mail: <ma...@jakarta.apache.org>
For additional commands, e-mail: <ma...@jakarta.apache.org>