You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by co...@apache.org on 2005/11/24 06:20:54 UTC
svn commit: r348655 -
/tomcat/sandbox/java/org/apache/tomcat/util/net/PoolTcpEndpoint.java
Author: costin
Date: Wed Nov 23 21:20:50 2005
New Revision: 348655
URL: http://svn.apache.org/viewcvs?rev=348655&view=rev
Log:
Refactoring - leave only the core methods, no ThreadPool or specific
code.
Modified:
tomcat/sandbox/java/org/apache/tomcat/util/net/PoolTcpEndpoint.java
Modified: tomcat/sandbox/java/org/apache/tomcat/util/net/PoolTcpEndpoint.java
URL: http://svn.apache.org/viewcvs/tomcat/sandbox/java/org/apache/tomcat/util/net/PoolTcpEndpoint.java?rev=348655&r1=348654&r2=348655&view=diff
==============================================================================
--- tomcat/sandbox/java/org/apache/tomcat/util/net/PoolTcpEndpoint.java (original)
+++ tomcat/sandbox/java/org/apache/tomcat/util/net/PoolTcpEndpoint.java Wed Nov 23 21:20:50 2005
@@ -17,21 +17,17 @@
package org.apache.tomcat.util.net;
import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.net.BindException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
-import java.security.AccessControlException;
-import java.util.Stack;
-import java.util.Vector;
+import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tomcat.util.res.StringManager;
import org.apache.tomcat.util.threads.ThreadPool;
-import org.apache.tomcat.util.threads.ThreadPoolRunnable;
+import org.apache.tomcat.util.threads.ThreadPool.ThreadPoolListener;
/* Similar with MPM module in Apache2.0. Handles all the details related with
"tcp server" functionality - thread management, accept policy, etc.
@@ -60,98 +56,123 @@
static Log log=LogFactory.getLog(PoolTcpEndpoint.class );
- private StringManager sm =
+ protected StringManager sm =
StringManager.getManager("org.apache.tomcat.util.net.res");
- private static final int BACKLOG = 100;
- private static final int TIMEOUT = 1000;
+ protected static final int BACKLOG = 100;
+ protected static final int TIMEOUT = 1000;
- private final Object threadSync = new Object();
+ protected int backlog = BACKLOG;
+ protected int serverTimeout = TIMEOUT;
- private int backlog = BACKLOG;
- private int serverTimeout = TIMEOUT;
+ protected InetAddress inet;
+ protected int port;
- private InetAddress inet;
- private int port;
+ protected ServerSocket serverSocket;
- private ServerSocketFactory factory;
- private ServerSocket serverSocket;
-
- private volatile boolean running = false;
- private volatile boolean paused = false;
- private boolean initialized = false;
- private boolean reinitializing = false;
- static final int debug=0;
+ protected volatile boolean running = false;
+ protected volatile boolean paused = false;
+ protected boolean initialized = false;
+ protected boolean reinitializing = false;
protected boolean tcpNoDelay=false;
protected int linger=100;
protected int socketTimeout=-1;
- private boolean lf = true;
-
// ------ Leader follower fields
TcpConnectionHandler handler;
- ThreadPoolRunnable listener;
- ThreadPool tp;
+ // ------ Master slave fields
+ protected int curThreads = 0;
+ protected int maxThreads = 20;
+ protected int maxSpareThreads = 20;
+ protected int minSpareThreads = 20;
+ protected String type;
+
+ protected String name = "EP"; // base name for threads
- // ------ Master slave fields
+ protected int threadPriority;
- /* The background thread. */
- private Thread thread = null;
- /* Available processors. */
- private Stack workerThreads = new Stack();
- private int curThreads = 0;
- private int maxThreads = 20;
- /* All processors which have been created. */
- private Vector created = new Vector();
+ protected boolean daemon = true;
+ private ArrayList listeners = new ArrayList();
public PoolTcpEndpoint() {
- tp = new ThreadPool();
}
- public PoolTcpEndpoint( ThreadPool tp ) {
- this.tp=tp;
+ public static PoolTcpEndpoint getEndpoint(String type) {
+ String cn = null;
+ if( "apr".equals( type )) {
+ cn = "org.apache.tomcat.util.net.AprEndpoint";
+ }
+ if( "lf".equals( type )) {
+ cn = "org.apache.tomcat.util.net.LeaderFollowerEndpoint";
+ }
+ if( "acc".equals( type )) {
+ cn = "org.apache.tomcat.util.net.AcceptorEndpoint";
+ }
+ if( "ms".equals( type )) {
+ cn = "org.apache.tomcat.util.net.MasterSlaveEndpoint";
+ }
+ PoolTcpEndpoint res = null;
+ if( cn != null ) {
+ try {
+ Class c = Class.forName( cn );
+ res = (PoolTcpEndpoint)c.newInstance();
+ } catch( Throwable t ) {
+ throw new RuntimeException("Can't create endpoint " + cn);
+ }
+ }
+ if( res == null ) {
+ res = new SimpleEndpoint();
+ }
+ res.type = type;
+ return res;
}
// -------------------- Configuration --------------------
+
+ public String getName() {
+ return name;
+ }
+ public void setName(String name) {
+ this.name = name;
+ }
+
+
public void setMaxThreads(int maxThreads) {
- if( maxThreads > 0)
- tp.setMaxThreads(maxThreads);
+ this.maxThreads = maxThreads;
}
public int getMaxThreads() {
- return tp.getMaxThreads();
+ return maxThreads;
}
public void setMaxSpareThreads(int maxThreads) {
- if(maxThreads > 0)
- tp.setMaxSpareThreads(maxThreads);
+ this.maxSpareThreads = maxThreads;
}
public int getMaxSpareThreads() {
- return tp.getMaxSpareThreads();
+ return maxSpareThreads;
}
public void setMinSpareThreads(int minThreads) {
- if(minThreads > 0)
- tp.setMinSpareThreads(minThreads);
+ this.minSpareThreads = minThreads;
}
public int getMinSpareThreads() {
- return tp.getMinSpareThreads();
+ return minSpareThreads;
}
public void setThreadPriority(int threadPriority) {
- tp.setThreadPriority(threadPriority);
+ this.threadPriority = threadPriority;
}
public int getThreadPriority() {
- return tp.getThreadPriority();
+ return threadPriority;
}
public int getPort() {
@@ -173,15 +194,11 @@
public void setServerSocket(ServerSocket ss) {
serverSocket = ss;
}
-
- public void setServerSocketFactory( ServerSocketFactory factory ) {
- this.factory=factory;
+
+ public ServerSocket getServerSocket() {
+ return serverSocket;
}
- ServerSocketFactory getServerSocketFactory() {
- return factory;
- }
-
public void setConnectionHandler( TcpConnectionHandler handler ) {
this.handler=handler;
}
@@ -257,19 +274,11 @@
}
public String getStrategy() {
- if (lf) {
- return "lf";
- } else {
- return "ms";
- }
+ return type;
}
public void setStrategy(String strategy) {
- if ("ms".equals(strategy)) {
- lf = false;
- } else {
- lf = true;
- }
+ // shouldn't be used.
}
public int getCurrentThreadCount() {
@@ -277,83 +286,40 @@
}
public int getCurrentThreadsBusy() {
- return curThreads - workerThreads.size();
+ return curThreads;
}
// -------------------- Public methods --------------------
public void initEndpoint() throws IOException, InstantiationException {
- try {
- if(factory==null)
- factory=ServerSocketFactory.getDefault();
- if(serverSocket==null) {
- try {
- if (inet == null) {
- serverSocket = factory.createSocket(port, backlog);
- } else {
- serverSocket = factory.createSocket(port, backlog, inet);
- }
- } catch ( BindException be ) {
- throw new BindException(be.getMessage() + ":" + port);
- }
- }
- if( serverTimeout >= 0 )
- serverSocket.setSoTimeout( serverTimeout );
- } catch( IOException ex ) {
- throw ex;
- } catch( InstantiationException ex1 ) {
- throw ex1;
- }
- initialized = true;
}
public void startEndpoint() throws IOException, InstantiationException {
- if (!initialized) {
- initEndpoint();
- }
- if (lf) {
- tp.start();
- }
- running = true;
- paused = false;
- if (lf) {
- listener = new LeaderFollowerWorkerThread(this);
- tp.runIt(listener);
- } else {
- maxThreads = getMaxThreads();
- threadStart();
- }
}
public void pauseEndpoint() {
- if (running && !paused) {
- paused = true;
- unlockAccept();
- }
}
public void resumeEndpoint() {
- if (running) {
- paused = false;
- }
}
public void stopEndpoint() {
- if (running) {
- if (lf) {
- tp.shutdown();
- }
- running = false;
- if (serverSocket != null) {
- closeServerSocket();
- }
- if (!lf) {
- threadStop();
- }
- initialized=false ;
- }
}
+ protected void processSocket(Socket s, TcpConnection con,
+ Object[] threadData) {
+ }
+
+
+ /** To notify worker done, recycle
+ */
+ public void workerDone(Runnable r) {
+
+ }
+
+
+ // ---------------- Utils ----------------------
+
protected void closeServerSocket() {
if (!paused)
unlockAccept();
@@ -393,109 +359,7 @@
}
}
- // -------------------- Private methods
-
- Socket acceptSocket() {
- if( !running || serverSocket==null ) return null;
-
- Socket accepted = null;
-
- try {
- if(factory==null) {
- accepted = serverSocket.accept();
- } else {
- accepted = factory.acceptSocket(serverSocket);
- }
- if (null == accepted) {
- log.warn(sm.getString("endpoint.warn.nullSocket"));
- } else {
- if (!running) {
- accepted.close(); // rude, but unlikely!
- accepted = null;
- } else if (factory != null) {
- factory.initSocket( accepted );
- }
- }
- }
- catch(InterruptedIOException iioe) {
- // normal part -- should happen regularly so
- // that the endpoint can release if the server
- // is shutdown.
- }
- catch (AccessControlException ace) {
- // When using the Java SecurityManager this exception
- // can be thrown if you are restricting access to the
- // socket with SocketPermission's.
- // Log the unauthorized access and continue
- String msg = sm.getString("endpoint.warn.security",
- serverSocket, ace);
- log.warn(msg);
- }
- catch (IOException e) {
-
- String msg = null;
-
- if (running) {
- msg = sm.getString("endpoint.err.nonfatal",
- serverSocket, e);
- log.error(msg, e);
- }
-
- if (accepted != null) {
- try {
- accepted.close();
- } catch(Throwable ex) {
- msg = sm.getString("endpoint.err.nonfatal",
- accepted, ex);
- log.warn(msg, ex);
- }
- accepted = null;
- }
-
- if( ! running ) return null;
- reinitializing = true;
- // Restart endpoint when getting an IOException during accept
- synchronized (threadSync) {
- if (reinitializing) {
- reinitializing = false;
- // 1) Attempt to close server socket
- closeServerSocket();
- initialized = false;
- // 2) Reinit endpoint (recreate server socket)
- try {
- msg = sm.getString("endpoint.warn.reinit");
- log.warn(msg);
- initEndpoint();
- } catch (Throwable t) {
- msg = sm.getString("endpoint.err.nonfatal",
- serverSocket, t);
- log.error(msg, t);
- }
- // 3) If failed, attempt to restart endpoint
- if (!initialized) {
- msg = sm.getString("endpoint.warn.restart");
- log.warn(msg);
- try {
- stopEndpoint();
- initEndpoint();
- startEndpoint();
- } catch (Throwable t) {
- msg = sm.getString("endpoint.err.fatal",
- serverSocket, t);
- log.error(msg, t);
- }
- // Current thread is now invalid: kill it
- throw new ThreadDeath();
- }
- }
- }
-
- }
-
- return accepted;
- }
-
- void setSocketOptions(Socket socket)
+ protected void setSocketOptions(Socket socket)
throws SocketException {
if(linger >= 0 )
socket.setSoLinger( true, linger);
@@ -505,179 +369,46 @@
socket.setSoTimeout( socketTimeout );
}
-
- void processSocket(Socket s, TcpConnection con, Object[] threadData) {
- // Process the connection
- int step = 1;
- try {
-
- // 1: Set socket options: timeout, linger, etc
- setSocketOptions(s);
-
- // 2: SSL handshake
- step = 2;
- if (getServerSocketFactory() != null) {
- getServerSocketFactory().handshake(s);
- }
-
- // 3: Process the connection
- step = 3;
- con.setEndpoint(this);
- con.setSocket(s);
- getConnectionHandler().processConnection(con, threadData);
-
- } catch (SocketException se) {
- log.error(sm.getString("endpoint.err.socket", s.getInetAddress()),
- se);
- // Try to close the socket
- try {
- s.close();
- } catch (IOException e) {
- }
- } catch (Throwable t) {
- if (step == 2) {
- if (log.isDebugEnabled()) {
- log.debug(sm.getString("endpoint.err.handshake"), t);
- }
- } else {
- log.error(sm.getString("endpoint.err.unexpected"), t);
- }
- // Try to close the socket
- try {
- s.close();
- } catch (IOException e) {
- }
- } finally {
- if (con != null) {
- con.recycle();
- }
- }
- }
-
-
- // -------------------------------------------------- Master Slave Methods
-
-
/**
- * Create (or allocate) and return an available processor for use in
- * processing a specific HTTP request, if possible. If the maximum
- * allowed processors have already been created and are in use, return
- * <code>null</code> instead.
+ * The background thread that listens for incoming TCP/IP connections and
+ * hands them off to an appropriate processor.
*/
- private MasterSlaveWorkerThread createWorkerThread() {
-
- synchronized (workerThreads) {
- if (workerThreads.size() > 0) {
- return ((MasterSlaveWorkerThread) workerThreads.pop());
- }
- if ((maxThreads > 0) && (curThreads < maxThreads)) {
- return (newWorkerThread());
- } else {
- if (maxThreads < 0) {
- return (newWorkerThread());
- } else {
- return (null);
- }
- }
- }
-
+ public void run() {
}
-
- /**
- * Create and return a new processor suitable for processing HTTP
- * requests and returning the corresponding responses.
- */
- private MasterSlaveWorkerThread newWorkerThread() {
-
- MasterSlaveWorkerThread workerThread =
- new MasterSlaveWorkerThread(this, tp.getName() + "-" + (++curThreads));
- workerThread.start();
- created.addElement(workerThread);
- return (workerThread);
-
+ public void setSSLSupport(boolean secure, String socketFactoryName) throws Exception {
}
-
- /**
- * Recycle the specified Processor so that it can be used again.
- *
- * @param processor The processor to be recycled
- */
- void recycleWorkerThread(MasterSlaveWorkerThread workerThread) {
- workerThreads.push(workerThread);
+ public void setDaemon(boolean b) {
+ daemon=b;
}
-
- /**
- * The background thread that listens for incoming TCP/IP connections and
- * hands them off to an appropriate processor.
- */
- public void run() {
-
- // Loop until we receive a shutdown command
- while (running) {
-
- // Loop if endpoint is paused
- while (paused) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- // Ignore
- }
- }
-
- // Allocate a new worker thread
- MasterSlaveWorkerThread workerThread = createWorkerThread();
- if (workerThread == null) {
- try {
- // Wait a little for load to go down: as a result,
- // no accept will be made until the concurrency is
- // lower than the specified maxThreads, and current
- // connections will wait for a little bit instead of
- // failing right away.
- Thread.sleep(100);
- } catch (InterruptedException e) {
- // Ignore
- }
- continue;
- }
-
- // Accept the next incoming connection from the server socket
- Socket socket = acceptSocket();
-
- // Hand this socket off to an appropriate processor
- workerThread.assign(socket);
-
- // The processor will recycle itself when it finishes
-
- }
-
- // Notify the threadStop() method that we have shut ourselves down
- synchronized (threadSync) {
- threadSync.notifyAll();
- }
-
+ public boolean getDaemon() {
+ return daemon;
}
-
- /**
- * Start the background processing thread.
- */
- private void threadStart() {
- thread = new Thread(this, tp.getName());
- thread.setPriority(getThreadPriority());
- thread.setDaemon(true);
- thread.start();
+ protected void threadStart(Thread t) {
+ for( int i=0; i<listeners.size(); i++ ) {
+ EndpointListener tpl=(EndpointListener)listeners.get(i);
+ tpl.threadStart(this, t);
+ }
}
-
- /**
- * Stop the background processing thread.
- */
- private void threadStop() {
- thread = null;
+ protected void threadEnd(Thread t) {
+ for( int i=0; i<listeners.size(); i++ ) {
+ EndpointListener tpl=(EndpointListener)listeners.get(i);
+ tpl.threadStart(this, t);
+ }
+ }
+
+ public void addEndpointListener(EndpointListener listener) {
+ listeners.add(listener);
}
+
+ public static interface EndpointListener {
+ public void threadStart( PoolTcpEndpoint ep, Thread t);
+ public void threadEnd( PoolTcpEndpoint ep, Thread t);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org