You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by re...@apache.org on 2004/10/02 01:46:57 UTC
cvs commit: jakarta-tomcat-connectors/http11/src/java/org/apache/coyote/http11 Http11Protocol.java
remm 2004/10/01 16:46:57
Modified: util/java/org/apache/tomcat/util/net PoolTcpEndpoint.java
http11/src/java/org/apache/coyote/http11 Http11Protocol.java
Added: util/java/org/apache/tomcat/util/net
LeaderFollowerWorkerThread.java
MasterSlaveWorkerThread.java
Log:
- Hack in an alternate thread pool strategy (which is basically the TC 4.0 thread pool).
- The problem is that there are some environments where the default thread pool doesn't work well, and there's some black magic
involved with it.
- Another advantage of the thread pool is that its simple design allows server socket restart to mostly work (at least in TC 4.0 it worked), so
I think this is a good endpoint for not-that-stable VMs/OS combinations. A drawback is that the thread pool won't scale back (OTOH, scaling
back is dangerous as it could cause memory leaks depending on what the application is doing).
- I think in the future we could try to use the Java 5 thread pool (although for now, I haven't figured out a way to use it efficiently with
our stuff).
- Similarly, I haven't found a way to use ThreadPool efficiently with a dedicated socket listener thread. So the code is in PTcpEndpoint.
- From an efficiency standpoint, there's no measureable difference between the two thread pools on a 1 CPU machine (no big surprise).
- The default obviously remains the current thread pool.
Revision Changes Path
1.39 +227 -159 jakarta-tomcat-connectors/util/java/org/apache/tomcat/util/net/PoolTcpEndpoint.java
Index: PoolTcpEndpoint.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-connectors/util/java/org/apache/tomcat/util/net/PoolTcpEndpoint.java,v
retrieving revision 1.38
retrieving revision 1.39
diff -u -r1.38 -r1.39
--- PoolTcpEndpoint.java 13 Jul 2004 09:43:59 -0000 1.38
+++ PoolTcpEndpoint.java 1 Oct 2004 23:46:57 -0000 1.39
@@ -24,6 +24,8 @@
import java.net.Socket;
import java.net.SocketException;
import java.security.AccessControlException;
+import java.util.Stack;
+import java.util.Vector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -54,7 +56,9 @@
* @author Gal Shachor [shachor@il.ibm.com]
* @author Yoav Shapira <yo...@apache.org>
*/
-public class PoolTcpEndpoint { // implements Endpoint {
+public class PoolTcpEndpoint implements Runnable { // implements Endpoint {
+
+ static Log log=LogFactory.getLog(PoolTcpEndpoint.class );
private StringManager sm =
StringManager.getManager("org.apache.tomcat.util.net.res");
@@ -64,33 +68,46 @@
private final Object threadSync = new Object();
- private boolean isPool = true;
-
private int backlog = BACKLOG;
private int serverTimeout = TIMEOUT;
- TcpConnectionHandler handler;
-
private InetAddress inet;
private int port;
private ServerSocketFactory factory;
private ServerSocket serverSocket;
- ThreadPoolRunnable listener;
private volatile boolean running = false;
private volatile boolean paused = false;
private boolean initialized = false;
private boolean reinitializing = false;
static final int debug=0;
- ThreadPool tp;
-
- static Log log=LogFactory.getLog(PoolTcpEndpoint.class );
-
protected boolean tcpNoDelay=false;
protected int linger=100;
protected int socketTimeout=-1;
+ private boolean lf = false;
+
+
+ // ------ Leader follower fields
+
+
+ TcpConnectionHandler handler;
+ ThreadPoolRunnable listener;
+ ThreadPool tp;
+
+
+ // ------ Master slave fields
+
+ /* The background thread. */
+ private Thread thread = null;
+ /* Available processors. */
+ private Stack workerThreads = new Stack();
+ private int curThreads = 0;
+ private int maxThreads = 20;
+ /* All processors which have been created. */
+ private Vector created = new Vector();
+
public PoolTcpEndpoint() {
tp = new ThreadPool();
@@ -102,14 +119,6 @@
// -------------------- Configuration --------------------
- public void setPoolOn(boolean isPool) {
- this.isPool = isPool;
- }
-
- public boolean isPoolOn() {
- return isPool;
- }
-
public void setMaxThreads(int maxThreads) {
if( maxThreads > 0)
tp.setMaxThreads(maxThreads);
@@ -247,13 +256,37 @@
serverTimeout=i;
}
+ public String getStrategy() {
+ if (lf) {
+ return "lf";
+ } else {
+ return "ms";
+ }
+ }
+
+ public void setStrategy(String strategy) {
+ if ("ms".equals(strategy)) {
+ lf = false;
+ } else {
+ lf = true;
+ }
+ }
+
+ public int getCurrentThreadCount() {
+ return curThreads;
+ }
+
+ public int getCurrentThreadsBusy() {
+ return curThreads - workerThreads.size();
+ }
+
// -------------------- Public methods --------------------
public void initEndpoint() throws IOException, InstantiationException {
- try {
- if(factory==null)
- factory=ServerSocketFactory.getDefault();
- if(serverSocket==null) {
+ try {
+ if(factory==null)
+ factory=ServerSocketFactory.getDefault();
+ if(serverSocket==null) {
try {
if (inet == null) {
serverSocket = factory.createSocket(port, backlog);
@@ -263,34 +296,33 @@
} catch ( BindException be ) {
throw new BindException(be.getMessage() + ":" + port);
}
- }
+ }
if( serverTimeout >= 0 )
- serverSocket.setSoTimeout( serverTimeout );
- } catch( IOException ex ) {
- // log("couldn't start endpoint", ex, Logger.DEBUG);
+ serverSocket.setSoTimeout( serverTimeout );
+ } catch( IOException ex ) {
throw ex;
- } catch( InstantiationException ex1 ) {
- // log("couldn't start endpoint", ex1, Logger.DEBUG);
+ } catch( InstantiationException ex1 ) {
throw ex1;
- }
+ }
initialized = true;
}
-
+
public void startEndpoint() throws IOException, InstantiationException {
if (!initialized) {
initEndpoint();
}
- if(isPool) {
- tp.start();
- }
- running = true;
+ if (lf) {
+ tp.start();
+ }
+ running = true;
paused = false;
- if(isPool) {
- listener = new TcpWorkerThread(this);
+ if (lf) {
+ listener = new LeaderFollowerWorkerThread(this);
tp.runIt(listener);
} else {
- log.error("XXX Error - need pool !");
- }
+ maxThreads = getMaxThreads();
+ threadStart();
+ }
}
public void pauseEndpoint() {
@@ -307,13 +339,17 @@
}
public void stopEndpoint() {
- if (running) {
- tp.shutdown();
- running = false;
+ if (running) {
+ if (lf) {
+ tp.shutdown();
+ } else {
+ threadStop();
+ }
+ running = false;
if (serverSocket != null) {
closeServerSocket();
}
- }
+ }
}
protected void closeServerSocket() {
@@ -456,33 +492,6 @@
return accepted;
}
- /** @deprecated
- */
- public void log(String msg)
- {
- log.info(msg);
- }
-
- /** @deprecated
- */
- public void log(String msg, Throwable t)
- {
- log.error( msg, t );
- }
-
- /** @deprecated
- */
- public void log(String msg, int level)
- {
- log.info( msg );
- }
-
- /** @deprecated
- */
- public void log(String msg, Throwable t, int level) {
- log.error( msg, t );
- }
-
void setSocketOptions(Socket socket)
throws SocketException {
if(linger >= 0 )
@@ -493,120 +502,179 @@
socket.setSoTimeout( socketTimeout );
}
-}
+
+ void processSocket(Socket s, TcpConnection con, Object[] threadData) {
+ // Process the connection
+ int step = 1;
+ try {
+
+ // 1: Set socket options: timeout, linger, etc
+ setSocketOptions(s);
+
+ // 2: SSL handshake
+ step = 2;
+ if (getServerSocketFactory() != null) {
+ getServerSocketFactory().handshake(s);
+ }
+
+ // 3: Process the connection
+ step = 3;
+ con.setEndpoint(this);
+ con.setSocket(s);
+ getConnectionHandler().processConnection(con, threadData);
+
+ } catch (SocketException se) {
+ PoolTcpEndpoint.log.error(
+ "Remote Host "
+ + s.getInetAddress()
+ + " SocketException: "
+ + se.getMessage());
+ // Try to close the socket
+ try {
+ s.close();
+ } catch (IOException e) {
+ }
+ } catch (Throwable t) {
+ if (step == 2) {
+ PoolTcpEndpoint.log.debug("Handshake failed", t);
+ } else {
+ PoolTcpEndpoint.log.error("Unexpected error", t);
+ }
+ // Try to close the socket
+ try {
+ s.close();
+ } catch (IOException e) {
+ }
+ } finally {
+ if (con != null) {
+ con.recycle();
+ }
+ }
+ }
+
-// -------------------- Threads --------------------
+ // -------------------------------------------------- Master Slave Methods
-/*
- * I switched the threading model here.
- *
- * We used to have a "listener" thread and a "connection"
- * thread, this results in code simplicity but also a needless
- * thread switch.
- *
- * Instead I am now using a pool of threads, all the threads are
- * simmetric in their execution and no thread switch is needed.
- */
-class TcpWorkerThread implements ThreadPoolRunnable {
- /* This is not a normal Runnable - it gets attached to an existing
- thread, runs and when run() ends - the thread keeps running.
- It's better to keep the name ThreadPoolRunnable - avoid confusion.
- We also want to use per/thread data and avoid sync wherever possible.
- */
- PoolTcpEndpoint endpoint;
+ /**
+ * Create (or allocate) and return an available processor for use in
+ * processing a specific HTTP request, if possible. If the maximum
+ * allowed processors have already been created and are in use, return
+ * <code>null</code> instead.
+ */
+ private MasterSlaveWorkerThread createWorkerThread() {
+
+ synchronized (workerThreads) {
+ if (workerThreads.size() > 0) {
+ return ((MasterSlaveWorkerThread) workerThreads.pop());
+ }
+ if ((maxThreads > 0) && (curThreads < maxThreads)) {
+ return (newWorkerThread());
+ } else {
+ if (maxThreads < 0) {
+ return (newWorkerThread());
+ } else {
+ return (null);
+ }
+ }
+ }
+
+ }
+
- public TcpWorkerThread(PoolTcpEndpoint endpoint) {
- this.endpoint = endpoint;
+ /**
+ * Create and return a new processor suitable for processing HTTP
+ * requests and returning the corresponding responses.
+ */
+ private MasterSlaveWorkerThread newWorkerThread() {
+
+ MasterSlaveWorkerThread workerThread =
+ new MasterSlaveWorkerThread(this, tp.getName() + "-" + (++curThreads));
+ workerThread.start();
+ created.addElement(workerThread);
+ return (workerThread);
+
}
- public Object[] getInitData() {
- // no synchronization overhead, but 2 array access
- Object obj[]=new Object[2];
- obj[1]= endpoint.getConnectionHandler().init();
- obj[0]=new TcpConnection();
- return obj;
+
+ /**
+ * Recycle the specified Processor so that it can be used again.
+ *
+ * @param processor The processor to be recycled
+ */
+ void recycleWorkerThread(MasterSlaveWorkerThread workerThread) {
+ workerThreads.push(workerThread);
}
+
- public void runIt(Object perThrData[]) {
+ /**
+ * The background thread that listens for incoming TCP/IP connections and
+ * hands them off to an appropriate processor.
+ */
+ public void run() {
- // Create per-thread cache
- if (endpoint.isRunning()) {
+ // Loop until we receive a shutdown command
+ while (running) {
// Loop if endpoint is paused
- while (endpoint.isPaused()) {
+ while (paused) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore
}
}
-
- // Accept a new connection
- Socket s = null;
- try {
- s = endpoint.acceptSocket();
- } finally {
- // Continue accepting on another thread...
- if (endpoint.isRunning()) {
- endpoint.tp.runIt(this);
+
+ // Accept the next incoming connection from the server socket
+ Socket socket = acceptSocket();
+
+ // Hand this socket off to an appropriate processor
+ MasterSlaveWorkerThread workerThread = createWorkerThread();
+ if (workerThread == null) {
+ try {
+ log.warn(sm.getString("endpoint.noProcessor"));
+ socket.close();
+ } catch (IOException e) {
+ ;
}
+ continue;
}
+ workerThread.assign(socket);
- // Process the connection
- if (null != s) {
- TcpConnection con = null;
- int step = 1;
- try {
+ // The processor will recycle itself when it finishes
- // 1: Set socket options: timeout, linger, etc
- endpoint.setSocketOptions(s);
+ }
- // 2: SSL handshake
- step = 2;
- if (endpoint.getServerSocketFactory() != null) {
- endpoint.getServerSocketFactory().handshake(s);
- }
+ // Notify the threadStop() method that we have shut ourselves down
+ synchronized (threadSync) {
+ threadSync.notifyAll();
+ }
- // 3: Process the connection
- step = 3;
- con = (TcpConnection) perThrData[0];
- con.setEndpoint(endpoint);
- con.setSocket(s);
- endpoint.getConnectionHandler().processConnection(
- con,
- (Object[]) perThrData[1]);
-
- } catch (SocketException se) {
- PoolTcpEndpoint.log.error(
- "Remote Host "
- + s.getInetAddress()
- + " SocketException: "
- + se.getMessage());
- // Try to close the socket
- try {
- s.close();
- } catch (IOException e) {
- }
- } catch (Throwable t) {
- if (step == 2) {
- PoolTcpEndpoint.log.debug("Handshake failed", t);
- } else {
- PoolTcpEndpoint.log.error("Unexpected error", t);
- }
- // Try to close the socket
- try {
- s.close();
- } catch (IOException e) {
- }
- } finally {
- if (con != null) {
- con.recycle();
- }
- }
- }
+ }
+
+
+ /**
+ * Start the background processing thread.
+ */
+ private void threadStart() {
+ thread = new Thread(this, tp.getName());
+ thread.setPriority(getThreadPriority());
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ /**
+ * Stop the background processing thread.
+ */
+ private void threadStop() {
+ try {
+ threadSync.wait(5000);
+ } catch (InterruptedException e) {
+ ;
}
+ thread = null;
}
-
+
+
}
1.1 jakarta-tomcat-connectors/util/java/org/apache/tomcat/util/net/LeaderFollowerWorkerThread.java
Index: LeaderFollowerWorkerThread.java
===================================================================
/*
* Copyright 1999-2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tomcat.util.net;
import java.net.Socket;
import org.apache.tomcat.util.threads.ThreadPoolRunnable;
/*
* I switched the threading model here.
*
* We used to have a "listener" thread and a "connection"
* thread, this results in code simplicity but also a needless
* thread switch.
*
* Instead I am now using a pool of threads, all the threads are
* simmetric in their execution and no thread switch is needed.
*/
class LeaderFollowerWorkerThread implements ThreadPoolRunnable {
/* This is not a normal Runnable - it gets attached to an existing
thread, runs and when run() ends - the thread keeps running.
It's better to keep the name ThreadPoolRunnable - avoid confusion.
We also want to use per/thread data and avoid sync wherever possible.
*/
PoolTcpEndpoint endpoint;
public LeaderFollowerWorkerThread(PoolTcpEndpoint endpoint) {
this.endpoint = endpoint;
}
public Object[] getInitData() {
// no synchronization overhead, but 2 array access
Object obj[]=new Object[2];
obj[1]= endpoint.getConnectionHandler().init();
obj[0]=new TcpConnection();
return obj;
}
public void runIt(Object perThrData[]) {
// Create per-thread cache
if (endpoint.isRunning()) {
// Loop if endpoint is paused
while (endpoint.isPaused()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore
}
}
// Accept a new connection
Socket s = null;
try {
s = endpoint.acceptSocket();
} finally {
// Continue accepting on another thread...
if (endpoint.isRunning()) {
endpoint.tp.runIt(this);
}
}
// Process the connection
if (null != s) {
endpoint.processSocket(s, (TcpConnection) perThrData[0], (Object[]) perThrData[1]);
}
}
}
}
1.1 jakarta-tomcat-connectors/util/java/org/apache/tomcat/util/net/MasterSlaveWorkerThread.java
Index: MasterSlaveWorkerThread.java
===================================================================
/*
* Copyright 1999-2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tomcat.util.net;
import java.net.Socket;
import org.apache.tomcat.util.threads.ThreadWithAttributes;
/**
* Regular master slave thread pool. Slave threads will wait for work.
*/
class MasterSlaveWorkerThread implements Runnable {
protected PoolTcpEndpoint endpoint;
protected String threadName;
protected boolean stopped = false;
private Object threadSync = new Object();
private Thread thread = null;
private boolean available = false;
private Socket socket = null;
private TcpConnection con = new TcpConnection();
private Object[] threadData = null;
public MasterSlaveWorkerThread(PoolTcpEndpoint endpoint, String threadName) {
this.endpoint = endpoint;
this.threadName = threadName;
}
/**
* Process an incoming TCP/IP connection on the specified socket. Any
* exception that occurs during processing must be logged and swallowed.
* <b>NOTE</b>: This method is called from our Connector's thread. We
* must assign it to our own thread so that multiple simultaneous
* requests can be handled.
*
* @param socket TCP socket to process
*/
synchronized void assign(Socket socket) {
// Wait for the Processor to get the previous Socket
while (available) {
try {
wait();
} catch (InterruptedException e) {
}
}
// Store the newly available Socket and notify our thread
this.socket = socket;
available = true;
notifyAll();
}
/**
* Await a newly assigned Socket from our Connector, or <code>null</code>
* if we are supposed to shut down.
*/
private synchronized Socket await() {
// Wait for the Connector to provide a new Socket
while (!available) {
try {
wait();
} catch (InterruptedException e) {
}
}
// Notify the Connector that we have received this Socket
Socket socket = this.socket;
available = false;
notifyAll();
return (socket);
}
/**
* The background thread that listens for incoming TCP/IP connections and
* hands them off to an appropriate processor.
*/
public void run() {
// Process requests until we receive a shutdown signal
while (!stopped) {
// Wait for the next socket to be assigned
Socket socket = await();
if (socket == null)
continue;
// Process the request from this socket
endpoint.processSocket(socket, con, threadData);
// Finish up this request
endpoint.recycleWorkerThread(this);
}
// Tell threadStop() we have shut ourselves down successfully
synchronized (threadSync) {
threadSync.notifyAll();
}
}
/**
* Start the background processing thread.
*/
public void start() {
threadData = endpoint.getConnectionHandler().init();
thread = new ThreadWithAttributes(null, this);
thread.setName(threadName);
thread.setDaemon(true);
thread.start();
}
/**
* Stop the background processing thread.
*/
public void stop() {
stopped = true;
assign(null);
thread = null;
threadData = null;
}
}
1.58 +16 -11 jakarta-tomcat-connectors/http11/src/java/org/apache/coyote/http11/Http11Protocol.java
Index: Http11Protocol.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-connectors/http11/src/java/org/apache/coyote/http11/Http11Protocol.java,v
retrieving revision 1.57
retrieving revision 1.58
diff -u -r1.57 -r1.58
--- Http11Protocol.java 29 Sep 2004 09:54:28 -0000 1.57
+++ Http11Protocol.java 1 Oct 2004 23:46:57 -0000 1.58
@@ -154,8 +154,13 @@
// XXX It should be possible to use a single TP
tpOname=new ObjectName
(domain + ":" + "type=ThreadPool,name=" + getName());
- Registry.getRegistry(null, null)
- .registerComponent(tp, tpOname, null );
+ if ("ms".equals(getStrategy())) {
+ Registry.getRegistry(null, null)
+ .registerComponent(ep, tpOname, null );
+ } else {
+ Registry.getRegistry(null, null)
+ .registerComponent(tp, tpOname, null );
+ }
tp.setName(getName());
tp.setDaemon(false);
tp.addThreadPoolListener(new MXPoolListener(this, tp));
@@ -242,15 +247,6 @@
// -------------------- Pool setup --------------------
- public boolean getPools(){
- return ep.isPoolOn();
- }
-
- public void setPools( boolean t ) {
- ep.setPoolOn(t);
- setAttribute("pools", "" + t);
- }
-
public int getMaxThreads() {
return ep.getMaxThreads();
}
@@ -286,6 +282,15 @@
public int getThreadPriority() {
return ep.getThreadPriority();
}
+
+ public void setStrategy(String strategy) {
+ ep.setStrategy(strategy);
+ setAttribute("strategy", strategy);
+ }
+
+ public String getStrategy() {
+ return ep.getStrategy();
+ }
// -------------------- Tcp setup --------------------
---------------------------------------------------------------------
To unsubscribe, e-mail: tomcat-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: tomcat-dev-help@jakarta.apache.org